diff --git a/modules/browser-node/src/services/store.ts b/modules/browser-node/src/services/store.ts index 1126994db..97d55d921 100644 --- a/modules/browser-node/src/services/store.ts +++ b/modules/browser-node/src/services/store.ts @@ -1,5 +1,6 @@ import { ChannelDispute, + ChannelUpdate, CoreChannelState, CoreTransferState, FullChannelState, @@ -42,6 +43,7 @@ const getStoreName = (publicIdentifier: string) => { }; const NON_NAMESPACED_STORE = "VectorIndexedDBDatabase"; class VectorIndexedDBDatabase extends Dexie { + updates: Dexie.Table; channels: Dexie.Table; transfers: Dexie.Table; transactions: Dexie.Table; @@ -111,29 +113,38 @@ class VectorIndexedDBDatabase extends Dexie { // Using a temp table (transactions2) to migrate which column is the primary key // (transactionHash -> id) - this.version(5).stores({ - withdrawCommitment: "transferId,channelAddress,transactionHash", - transactions2: "id, transactionHash", - }).upgrade(async tx => { - const transactions = await tx.table("transactions").toArray(); - await tx.table("transactions2").bulkAdd(transactions); - }); + this.version(5) + .stores({ + withdrawCommitment: "transferId,channelAddress,transactionHash", + transactions2: "id, transactionHash", + }) + .upgrade(async (tx) => { + const transactions = await tx.table("transactions").toArray(); + await tx.table("transactions2").bulkAdd(transactions); + }); this.version(6).stores({ - transactions: null + transactions: null, }); - this.version(7).stores({ - transactions: "id, transactionHash" - }).upgrade(async tx => { - const transactions2 = await tx.table("transactions2").toArray(); - await tx.table("transactions").bulkAdd(transactions2); - }); + this.version(7) + .stores({ + transactions: "id, transactionHash", + }) + .upgrade(async (tx) => { + const transactions2 = await tx.table("transactions2").toArray(); + await tx.table("transactions").bulkAdd(transactions2); + }); this.version(8).stores({ - transactions2: null + transactions2: null, + }); + + this.version(9).stores({ + updates: "id.id, [channelAddress+nonce]", }); + this.updates = this.table("updates"); this.channels = this.table("channels"); this.transfers = this.table("transfers"); this.transactions = this.table("transactions"); @@ -245,8 +256,9 @@ export class BrowserStore implements IEngineStore, IChainServiceStore { } async saveChannelState(channelState: FullChannelState, transfer?: FullTransferState): Promise { - await this.db.transaction("rw", this.db.channels, this.db.transfers, async () => { + await this.db.transaction("rw", this.db.channels, this.db.transfers, this.db.updates, async () => { await this.db.channels.put(channelState); + await this.db.updates.put(channelState.latestUpdate); if (channelState.latestUpdate.type === UpdateType.create) { await this.db.transfers.put({ ...transfer!, @@ -264,6 +276,11 @@ export class BrowserStore implements IEngineStore, IChainServiceStore { }); } + async getUpdateById(id: string): Promise { + const update = await this.db.updates.get(id); + return update; + } + async getChannelStates(): Promise { const channels = await this.db.channels.toArray(); return channels; @@ -356,7 +373,7 @@ export class BrowserStore implements IEngineStore, IChainServiceStore { } async getTransactionById(onchainTransactionId: string): Promise { - return await this.db.transactions.get({ id: onchainTransactionId }) + return await this.db.transactions.get({ id: onchainTransactionId }); } async getActiveTransactions(): Promise { @@ -383,30 +400,33 @@ export class BrowserStore implements IEngineStore, IChainServiceStore { attempts.push({ // TransactionResponse fields (defined when submitted) gasLimit: response.gasLimit.toString(), - gasPrice: response.gasPrice.toString(), + gasPrice: response.gasPrice.toString(), transactionHash: response.hash, createdAt: new Date(), } as StoredTransactionAttempt); - await this.db.transactions.put({ - id: onchainTransactionId, - - //// Helper fields - channelAddress, - status: StoredTransactionStatus.submitted, - reason, - - //// Provider fields - // Minimum fields (should always be defined) - to: response.to!, - from: response.from, - data: response.data, - value: response.value.toString(), - chainId: response.chainId, - nonce: response.nonce, - attempts, - } as StoredTransaction, onchainTransactionId); + await this.db.transactions.put( + { + id: onchainTransactionId, + + //// Helper fields + channelAddress, + status: StoredTransactionStatus.submitted, + reason, + + //// Provider fields + // Minimum fields (should always be defined) + to: response.to!, + from: response.from, + data: response.data, + value: response.value.toString(), + chainId: response.chainId, + nonce: response.nonce, + attempts, + } as StoredTransaction, + onchainTransactionId, + ); } async saveTransactionReceipt(onchainTransactionId: string, receipt: TransactionReceipt): Promise { diff --git a/modules/engine/src/index.ts b/modules/engine/src/index.ts index fa1f8bf5b..f62f679d0 100644 --- a/modules/engine/src/index.ts +++ b/modules/engine/src/index.ts @@ -1014,7 +1014,9 @@ export class VectorEngine implements IVectorEngine { private async addTransactionToCommitment( params: EngineParams.AddTransactionToCommitment, - ): Promise> { + ): Promise< + Result + > { const method = "addTransactionToCommitment"; const methodId = getRandomBytes32(); this.logger.info({ params, method, methodId }, "Method started"); @@ -1588,15 +1590,16 @@ export class VectorEngine implements IVectorEngine { channelAddress: string, retryCount = 5, ) { - let result: Result | undefined; - for (let i = 0; i < retryCount; i++) { - result = await fn(); - if (!result.isError) { - return result; - } - this.logger.warn({ attempt: i, error: result.getError().message, channelAddress }, "Protocol method failed"); - await delay(500); - } + const result = await fn(); + // let result: Result | undefined; + // for (let i = 0; i < retryCount; i++) { + // result = await fn(); + // if (!result.isError) { + // return result; + // } + // this.logger.warn({ attempt: i, error: result.getError().message, channelAddress }, "Protocol method failed"); + // await delay(500); + // } return result as Result; } diff --git a/modules/protocol/package.json b/modules/protocol/package.json index c92fc2b83..51496f57b 100644 --- a/modules/protocol/package.json +++ b/modules/protocol/package.json @@ -32,7 +32,8 @@ "evt": "1.9.12", "fastq": "1.11.0", "pino": "6.11.1", - "tty": "1.0.1" + "tty": "1.0.1", + "uuid": "8.3.2" }, "devDependencies": { "@types/chai": "4.2.15", diff --git a/modules/protocol/src/errors.ts b/modules/protocol/src/errors.ts index 15e86ac49..05b4c3749 100644 --- a/modules/protocol/src/errors.ts +++ b/modules/protocol/src/errors.ts @@ -39,6 +39,7 @@ export class ValidationError extends ProtocolError { TransferTimeoutBelowMin: `Transfer timeout below minimum of ${MINIMUM_TRANSFER_TIMEOUT.toString()}s`, TransferTimeoutAboveMax: `Transfer timeout above maximum of ${MAXIMUM_TRANSFER_TIMEOUT.toString()}s`, UnrecognizedType: "Unrecognized update type", + UpdateIdSigInvalid: "Update id signature is invalid", } as const; constructor( @@ -135,6 +136,7 @@ export class QueuedUpdateError extends ProtocolError { StoreFailure: "Store method failed", TransferNotActive: "Transfer not found in activeTransfers", UnhandledPromise: "Unhandled promise rejection encountered", + UpdateIdSigInvalid: "Update id signature is invalid", } as const; // TODO: improve error from result diff --git a/modules/protocol/src/sync.ts b/modules/protocol/src/sync.ts index 43dbf6066..7abe0804d 100644 --- a/modules/protocol/src/sync.ts +++ b/modules/protocol/src/sync.ts @@ -33,7 +33,7 @@ type UpdateResult = { }; export type SelfUpdateResult = UpdateResult & { - successfullyApplied: boolean; + successfullyApplied: "synced" | "executed" | "previouslyExecuted"; }; export async function outbound( @@ -178,7 +178,7 @@ export async function outbound( updatedChannel: syncedChannel, updatedActiveTransfers: syncedActiveTransfers, updatedTransfer: syncedTransfer, - successfullyApplied: false, + successfullyApplied: "synced", }); } @@ -209,7 +209,7 @@ export async function outbound( updatedChannel: { ...updatedChannel, latestUpdate: counterpartyUpdate }, updatedTransfers: updatedActiveTransfers, updatedTransfer, - successfullyApplied: true, + successfullyApplied: "executed", }); } diff --git a/modules/protocol/src/testing/sync.spec.ts b/modules/protocol/src/testing/sync.spec.ts index e9cc01c72..3e189bbf6 100644 --- a/modules/protocol/src/testing/sync.spec.ts +++ b/modules/protocol/src/testing/sync.spec.ts @@ -5,7 +5,6 @@ import { createTestChannelUpdateWithSigners, createTestChannelStateWithSigners, createTestFullHashlockTransferState, - getRandomBytes32, createTestUpdateParams, mkAddress, mkSig, @@ -14,7 +13,6 @@ import { MemoryMessagingService, getTestLoggers, createTestChannelUpdate, - createTestChannelState, } from "@connext/vector-utils"; import { UpdateType, @@ -459,6 +457,7 @@ describe("outbound", () => { let validateParamsAndApplyStub: Sinon.SinonStub; // called during sync let validateAndApplyInboundStub: Sinon.SinonStub; + let validateUpdateIdSignatureStub: Sinon.SinonStub; beforeEach(async () => { signers = Array(2) @@ -476,6 +475,9 @@ describe("outbound", () => { // Stub out all signature validation validateUpdateSignatureStub = Sinon.stub(vectorUtils, "validateChannelSignatures").resolves(Result.ok(undefined)); + validateUpdateIdSignatureStub = Sinon.stub(vectorUtils, "validateChannelUpdateIdSignature").resolves( + Result.ok(undefined), + ); }); afterEach(() => { @@ -865,9 +867,9 @@ describe("outbound", () => { log, ); - // Verify the update was successfully sent + retried + // Verify the update was successfully sent + synced expect(res.getError()).to.be.undefined; - expect(res.getValue().successfullyApplied).to.be.false; + expect(res.getValue().successfullyApplied).to.be.eq("synced"); expect(res.getValue().updatedChannel).to.be.containSubset({ nonce: toSync.nonce, latestUpdate: toSync, diff --git a/modules/protocol/src/testing/validate.spec.ts b/modules/protocol/src/testing/validate.spec.ts index ef6a48d64..123f4425f 100644 --- a/modules/protocol/src/testing/validate.spec.ts +++ b/modules/protocol/src/testing/validate.spec.ts @@ -49,6 +49,7 @@ describe("validateUpdateParams", () => { // Declare all mocks let chainReader: Sinon.SinonStubbedInstance; + let validateUpdateIdSignatureStub: Sinon.SinonStub; // Create helpers to create valid contexts const createValidSetupContext = () => { @@ -198,6 +199,10 @@ describe("validateUpdateParams", () => { chainReader = Sinon.createStubInstance(VectorChainReader); chainReader.getChannelAddress.resolves(Result.ok(channelAddress)); chainReader.create.resolves(Result.ok(true)); + + validateUpdateIdSignatureStub = Sinon.stub(vectorUtils, "validateChannelUpdateIdSignature").resolves( + Result.ok(undefined), + ); }); afterEach(() => { @@ -795,6 +800,7 @@ describe("validateAndApplyInboundUpdate", () => { let chainReader: Sinon.SinonStubbedInstance; let validateParamsAndApplyUpdateStub: Sinon.SinonStub; let validateChannelUpdateSignaturesStub: Sinon.SinonStub; + let validateUpdateIdSignatureStub: Sinon.SinonStub; let generateSignedChannelCommitmentStub: Sinon.SinonStub; let applyUpdateStub: Sinon.SinonStub; let externalValidationStub: { @@ -834,6 +840,7 @@ describe("validateAndApplyInboundUpdate", () => { // Need for double signed and single signed validateChannelUpdateSignaturesStub.resolves(Result.ok(undefined)); + validateUpdateIdSignatureStub.resolves(Result.ok(undefined)); // Needed for double signed chainReader.resolve.resolves(Result.ok({ to: [updatedChannel.alice, updatedChannel.bob], amount: ["10", "2"] })); @@ -866,6 +873,9 @@ describe("validateAndApplyInboundUpdate", () => { validateChannelUpdateSignaturesStub = Sinon.stub(vectorUtils, "validateChannelSignatures").resolves( Result.ok(undefined), ); + validateUpdateIdSignatureStub = Sinon.stub(vectorUtils, "validateChannelUpdateIdSignature").resolves( + Result.ok(undefined), + ); generateSignedChannelCommitmentStub = Sinon.stub(vectorUtils, "generateSignedChannelCommitment"); applyUpdateStub = Sinon.stub(vectorUpdate, "applyUpdate"); externalValidationStub = { diff --git a/modules/protocol/src/testing/vector.spec.ts b/modules/protocol/src/testing/vector.spec.ts index 831b1f7bb..34bb77189 100644 --- a/modules/protocol/src/testing/vector.spec.ts +++ b/modules/protocol/src/testing/vector.spec.ts @@ -44,7 +44,7 @@ describe("Vector", () => { storeService.getChannelStates.resolves([]); // Mock sync outbound Sinon.stub(vectorSync, "outbound").resolves( - Result.ok({ updatedChannel: createTestChannelState(UpdateType.setup).channel, successfullyApplied: true }), + Result.ok({ updatedChannel: createTestChannelState(UpdateType.setup).channel, successfullyApplied: "executed" }), ); }); diff --git a/modules/protocol/src/update.ts b/modules/protocol/src/update.ts index e67f96924..6c4d58e63 100644 --- a/modules/protocol/src/update.ts +++ b/modules/protocol/src/update.ts @@ -366,6 +366,7 @@ function generateSetupUpdate( meta: params.details.meta ?? {}, }, assetId: AddressZero, + id: params.id, }; return unsigned; @@ -597,7 +598,7 @@ function generateBaseUpdate( params: UpdateParams, signer: IChannelSigner, initiatorIdentifier: string, -): Pick, "channelAddress" | "nonce" | "fromIdentifier" | "toIdentifier" | "type"> { +): Pick, "channelAddress" | "nonce" | "fromIdentifier" | "toIdentifier" | "type" | "id"> { const isInitiator = signer.publicIdentifier === initiatorIdentifier; const counterparty = signer.publicIdentifier === state.bobIdentifier ? state.aliceIdentifier : state.bobIdentifier; return { @@ -606,6 +607,7 @@ function generateBaseUpdate( type: params.type, fromIdentifier: initiatorIdentifier, toIdentifier: isInitiator ? counterparty : signer.publicIdentifier, + id: params.id, }; } diff --git a/modules/protocol/src/utils.ts b/modules/protocol/src/utils.ts index 4835f7f00..77f922c10 100644 --- a/modules/protocol/src/utils.ts +++ b/modules/protocol/src/utils.ts @@ -19,6 +19,7 @@ import { UpdateParamsMap, UpdateType, ChainError, + UpdateIdentifier, } from "@connext/vector-types"; import { getAddress } from "@ethersproject/address"; import { BigNumber } from "@ethersproject/bignumber"; @@ -27,6 +28,7 @@ import { hashChannelCommitment, hashTransferState, validateChannelUpdateSignatures, + recoverAddressFromChannelMessage, } from "@connext/vector-utils"; import Ajv from "ajv"; import { BaseLogger, Level } from "pino"; @@ -73,14 +75,31 @@ export async function validateChannelSignatures( return validateChannelUpdateSignatures(state, aliceSignature, bobSignature, requiredSigners, logger); } +export async function validateChannelUpdateIdSignature( + identifier: UpdateIdentifier, + initiatorIdentifier: string, +): Promise> { + try { + const recovered = await recoverAddressFromChannelMessage(identifier.id, identifier.signature); + if (recovered !== getSignerAddressFromPublicIdentifier(initiatorIdentifier)) { + return Result.fail(new Error(``)); + } + return Result.ok(undefined); + } catch (e) { + return Result.fail(new Error(`Failed to recover signer from update id: ${e.message}`)); + } +} + export const extractContextFromStore = async ( storeService: IVectorStore, channelAddress: string, + updateId: string, ): Promise< Result< { activeTransfers: FullTransferState[]; channelState: FullChannelState | undefined; + update: ChannelUpdate | undefined; }, Error > @@ -88,6 +107,7 @@ export const extractContextFromStore = async ( // First, pull all information out from the store let activeTransfers: FullTransferState[]; let channelState: FullChannelState | undefined; + let update: ChannelUpdate | undefined; let storeMethod = "getChannelState"; try { // will always need the previous state @@ -95,6 +115,8 @@ export const extractContextFromStore = async ( // will only need active transfers for create/resolve storeMethod = "getActiveTransfers"; activeTransfers = await storeService.getActiveTransfers(channelAddress); + storeMethod = "getUpdateById"; + update = await storeService.getUpdateById(updateId); } catch (e) { return Result.fail(new Error(`${storeMethod} failed: ${e.message}`)); } @@ -102,6 +124,7 @@ export const extractContextFromStore = async ( return Result.ok({ activeTransfers, channelState, + update, }); }; @@ -191,6 +214,7 @@ export function getParamsFromUpdate( channelAddress, type, details: paramDetails as UpdateParamsMap[T], + id: update.id, }); } diff --git a/modules/protocol/src/validate.ts b/modules/protocol/src/validate.ts index d38cac77c..feadaac94 100644 --- a/modules/protocol/src/validate.ts +++ b/modules/protocol/src/validate.ts @@ -35,6 +35,7 @@ import { getNextNonceForUpdate, getParamsFromUpdate, validateChannelSignatures, + validateChannelUpdateIdSignature, validateSchema, } from "./utils"; @@ -70,7 +71,21 @@ export async function validateUpdateParams( return handleError(ValidationError.reasons.InDispute); } - const { type, channelAddress, details } = params; + const { type, channelAddress, details, id } = params; + + // if this is *not* the initiator, verify the update id sig. + // if it is, they are only hurting themselves by not providing + // it correctly + if (signer.publicIdentifier !== initiatorIdentifier) { + const recovered = await validateChannelUpdateIdSignature(id, initiatorIdentifier); + if (recovered.isError) { + return Result.fail( + new ValidationError(ValidationError.reasons.UpdateIdSigInvalid, params, previousState, { + recoveryError: jsonifyError(recovered.getError()!), + }), + ); + } + } if (previousState && channelAddress !== previousState.channelAddress) { return handleError(ValidationError.reasons.InvalidChannelAddress); @@ -406,6 +421,15 @@ export async function validateAndApplyInboundUpdate( // Handle double signed updates without validating params if (update.aliceSignature && update.bobSignature) { + // Verify the update.id.signature is correct (should be initiator) + const recovered = await validateChannelUpdateIdSignature(update.id, update.fromIdentifier); + if (recovered.isError) { + return Result.fail( + new QueuedUpdateError(QueuedUpdateError.reasons.UpdateIdSigInvalid, update, previousState, { + recoveryError: jsonifyError(recovered.getError()!), + }), + ); + } // Get final transfer balance (required when applying resolve updates); let finalTransferBalance: Balance | undefined = undefined; if (update.type === UpdateType.resolve) { diff --git a/modules/protocol/src/vector.ts b/modules/protocol/src/vector.ts index 4cc7c5a4a..c4e6de499 100644 --- a/modules/protocol/src/vector.ts +++ b/modules/protocol/src/vector.ts @@ -19,7 +19,9 @@ import { ProtocolError, jsonifyError, Values, + UpdateIdentifier, } from "@connext/vector-types"; +import { v4 as uuidV4 } from "uuid"; import { getCreate2MultisigAddress, getRandomBytes32, delay } from "@connext/vector-utils"; import { Evt } from "evt"; import pino from "pino"; @@ -152,7 +154,11 @@ export class Vector implements IVectorProtocol { return resolve({ cancelled: true, value: ret }); }); const outboundPromise = new Promise(async (resolve) => { - const storeRes = await extractContextFromStore(this.storeService, initiated.params.channelAddress); + const storeRes = await extractContextFromStore( + this.storeService, + initiated.params.channelAddress, + initiated.params.id.id, + ); if (storeRes.isError) { // Return failure return Result.fail( @@ -161,7 +167,19 @@ export class Vector implements IVectorProtocol { }), ); } - const { channelState, activeTransfers } = storeRes.getValue(); + const { channelState, activeTransfers, update } = storeRes.getValue(); + if (update && update.aliceSignature && update.bobSignature) { + // Update has already been executed, see explanation in + // types/channel.ts for `UpdateIdentifier` + const transfer = [UpdateType.create, UpdateType.resolve].includes(update.type) + ? await this.storeService.getTransferState(update.details.transferId) + : undefined; + return resolve({ + cancelled: false, + value: { updatedTransfer: transfer, updatedChannel: channelState, updatedTransfers: activeTransfers }, + successfullyApplied: "previouslyExecuted", + }); + } try { const ret = await outbound( initiated.params, @@ -233,6 +251,7 @@ export class Vector implements IVectorProtocol { role: "outbound", channelAddress: initiated.params.channelAddress, updatedChannel, + successfullyApplied, }, "Update succeeded", ); @@ -247,11 +266,11 @@ export class Vector implements IVectorProtocol { } // If the update was not applied, but the channel was synced, return // undefined so that the proposed update may be re-queued - if (!successfullyApplied) { - // Merkle root changes are undone *before* syncing + if (successfullyApplied === "synced") { return undefined; } - // All is well, return value from outbound + // All is well, return value from outbound (applies for already executed + // updates as well) return value; }; @@ -286,13 +305,20 @@ export class Vector implements IVectorProtocol { }); const inboundPromise = new Promise(async (resolve) => { // Pull context from store - const storeRes = await extractContextFromStore(this.storeService, received.update.channelAddress); + const storeRes = await extractContextFromStore( + this.storeService, + received.update.channelAddress, + received.update.id.id, + ); if (storeRes.isError) { // Send message with error return returnError(QueuedUpdateError.reasons.StoreFailure, undefined, { storeError: storeRes.getError()?.message, }); } + // NOTE: no need to validate that the update has already been executed + // because that is asserted on sync, where as an initiator you dont have + // that certainty const stored = storeRes.getValue(); channelState = stored.channelState; try { @@ -344,7 +370,7 @@ export class Vector implements IVectorProtocol { }, "Cancelling update", ); - await returnError(QueuedUpdateError.reasons.Cancelled, channelState); + // await returnError(QueuedUpdateError.reasons.Cancelled, channelState); return undefined; } const value = res.value as Result; @@ -597,6 +623,14 @@ export class Vector implements IVectorProtocol { return this; } + private async generateIdentifier(): Promise { + const id = uuidV4(); + return { + id, + signature: await this.signer.signMessage(id), + }; + } + /* * *************************** * *** CORE PUBLIC METHODS *** @@ -621,6 +655,8 @@ export class Vector implements IVectorProtocol { return Result.fail(error); } + const id = await this.generateIdentifier(); + const create2Res = await getCreate2MultisigAddress( this.publicIdentifier, params.counterpartyIdentifier, @@ -632,7 +668,7 @@ export class Vector implements IVectorProtocol { return Result.fail( new QueuedUpdateError( QueuedUpdateError.reasons.Create2Failed, - { details: params, channelAddress: "", type: UpdateType.setup }, + { details: params, channelAddress: "", type: UpdateType.setup, id }, undefined, { create2Error: create2Res.getError()?.message, @@ -647,6 +683,7 @@ export class Vector implements IVectorProtocol { channelAddress, details: params, type: UpdateType.setup, + id, }; const returnVal = await this.executeUpdate(updateParams); @@ -692,6 +729,7 @@ export class Vector implements IVectorProtocol { channelAddress: params.channelAddress, type: UpdateType.deposit, details: params, + id: await this.generateIdentifier(), }; const returnVal = await this.executeUpdate(updateParams); @@ -721,6 +759,7 @@ export class Vector implements IVectorProtocol { channelAddress: params.channelAddress, type: UpdateType.create, details: params, + id: await this.generateIdentifier(), }; const returnVal = await this.executeUpdate(updateParams); @@ -750,6 +789,7 @@ export class Vector implements IVectorProtocol { channelAddress: params.channelAddress, type: UpdateType.resolve, details: params, + id: await this.generateIdentifier(), }; const returnVal = await this.executeUpdate(updateParams); diff --git a/modules/server-node/prisma-postgres/migrations/20210602212808_add_update_id/migration.sql b/modules/server-node/prisma-postgres/migrations/20210602212808_add_update_id/migration.sql new file mode 100644 index 000000000..8db587da4 --- /dev/null +++ b/modules/server-node/prisma-postgres/migrations/20210602212808_add_update_id/migration.sql @@ -0,0 +1,17 @@ +/* + Warnings: + + - You are about to drop the column `merkleProofData` on the `update` table. All the data in the column will be lost. + - A unique constraint covering the columns `[id]` on the table `update` will be added. If there are existing duplicate values, this will fail. + +*/ +-- AlterTable +ALTER TABLE "onchain_transaction" ALTER COLUMN "id" DROP DEFAULT; + +-- AlterTable +ALTER TABLE "update" DROP COLUMN "merkleProofData", +ADD COLUMN "id" TEXT, +ADD COLUMN "idSignature" TEXT; + +-- CreateIndex +CREATE UNIQUE INDEX "update.id_unique" ON "update"("id"); diff --git a/modules/server-node/prisma-postgres/schema.prisma b/modules/server-node/prisma-postgres/schema.prisma index 1322bb82c..14be53c8b 100644 --- a/modules/server-node/prisma-postgres/schema.prisma +++ b/modules/server-node/prisma-postgres/schema.prisma @@ -79,6 +79,9 @@ model Channel { model Update { // COMMON PARAMS + id String? + idSignature String? + // id params optional for restoring transfers (needs create update) channelAddress String? channel Channel? @relation(fields: [channelAddress], references: [channelAddress]) channelAddressId String // required for ID so that relation can be removed @@ -127,6 +130,7 @@ model Update { resolvedTransfer Transfer? @relation("ResolvedTransfer") @@id([channelAddressId, nonce]) + @@unique(id) @@map(name: "update") } diff --git a/modules/server-node/prisma-sqlite/migrations/20210602212112_add_update_id/migration.sql b/modules/server-node/prisma-sqlite/migrations/20210602212112_add_update_id/migration.sql new file mode 100644 index 000000000..3ed5286ce --- /dev/null +++ b/modules/server-node/prisma-sqlite/migrations/20210602212112_add_update_id/migration.sql @@ -0,0 +1,51 @@ +/* + Warnings: + + - You are about to drop the column `merkleProofData` on the `update` table. All the data in the column will be lost. + +*/ +-- RedefineTables +PRAGMA foreign_keys=OFF; +CREATE TABLE "new_update" ( + "id" TEXT, + "idSignature" TEXT, + "channelAddress" TEXT, + "channelAddressId" TEXT NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "fromIdentifier" TEXT NOT NULL, + "toIdentifier" TEXT NOT NULL, + "type" TEXT NOT NULL, + "nonce" INTEGER NOT NULL, + "amountA" TEXT NOT NULL, + "amountB" TEXT NOT NULL, + "toA" TEXT NOT NULL, + "toB" TEXT NOT NULL, + "assetId" TEXT NOT NULL, + "signatureA" TEXT, + "signatureB" TEXT, + "totalDepositsAlice" TEXT, + "totalDepositsBob" TEXT, + "transferAmountA" TEXT, + "transferAmountB" TEXT, + "transferToA" TEXT, + "transferToB" TEXT, + "transferId" TEXT, + "transferDefinition" TEXT, + "transferTimeout" TEXT, + "transferInitialState" TEXT, + "transferEncodings" TEXT, + "meta" TEXT, + "responder" TEXT, + "transferResolver" TEXT, + "merkleRoot" TEXT, + + PRIMARY KEY ("channelAddressId", "nonce"), + FOREIGN KEY ("channelAddress") REFERENCES "channel" ("channelAddress") ON DELETE SET NULL ON UPDATE CASCADE +); +INSERT INTO "new_update" ("channelAddress", "channelAddressId", "createdAt", "fromIdentifier", "toIdentifier", "type", "nonce", "amountA", "amountB", "toA", "toB", "assetId", "signatureA", "signatureB", "totalDepositsAlice", "totalDepositsBob", "transferAmountA", "transferAmountB", "transferToA", "transferToB", "transferId", "transferDefinition", "transferTimeout", "transferInitialState", "transferEncodings", "meta", "responder", "transferResolver", "merkleRoot") SELECT "channelAddress", "channelAddressId", "createdAt", "fromIdentifier", "toIdentifier", "type", "nonce", "amountA", "amountB", "toA", "toB", "assetId", "signatureA", "signatureB", "totalDepositsAlice", "totalDepositsBob", "transferAmountA", "transferAmountB", "transferToA", "transferToB", "transferId", "transferDefinition", "transferTimeout", "transferInitialState", "transferEncodings", "meta", "responder", "transferResolver", "merkleRoot" FROM "update"; +DROP TABLE "update"; +ALTER TABLE "new_update" RENAME TO "update"; +CREATE UNIQUE INDEX "update.id_unique" ON "update"("id"); +CREATE UNIQUE INDEX "update_channelAddress_unique" ON "update"("channelAddress"); +PRAGMA foreign_key_check; +PRAGMA foreign_keys=ON; diff --git a/modules/server-node/prisma-sqlite/schema.prisma b/modules/server-node/prisma-sqlite/schema.prisma index c74f45bb1..2ed364a4c 100644 --- a/modules/server-node/prisma-sqlite/schema.prisma +++ b/modules/server-node/prisma-sqlite/schema.prisma @@ -79,6 +79,9 @@ model Channel { model Update { // COMMON PARAMS + id String? + idSignature String? + // id params optional for restoring transfers (needs create update) channelAddress String? channel Channel? @relation(fields: [channelAddress], references: [channelAddress]) channelAddressId String // required for ID so that relation can be removed @@ -127,6 +130,7 @@ model Update { resolvedTransfer Transfer? @relation("ResolvedTransfer") @@id([channelAddressId, nonce]) + @@unique(id) @@map(name: "update") } diff --git a/modules/server-node/src/services/store.ts b/modules/server-node/src/services/store.ts index 7b905ff9d..80491618b 100644 --- a/modules/server-node/src/services/store.ts +++ b/modules/server-node/src/services/store.ts @@ -18,6 +18,7 @@ import { GetTransfersFilterOpts, StoredTransactionAttempt, StoredTransactionReceipt, + ChannelUpdate, } from "@connext/vector-types"; import { getRandomBytes32, getSignerAddressFromPublicIdentifier, mkSig } from "@connext/vector-utils"; import { BigNumber } from "@ethersproject/bignumber"; @@ -88,6 +89,71 @@ const convertOnchainTransactionEntityToTransaction = ( }; }; +const convertUpdateEntityToChannelUpdate = (entity: Update & { channel: Channel | null }): ChannelUpdate => { + let details: SetupUpdateDetails | DepositUpdateDetails | CreateUpdateDetails | ResolveUpdateDetails | undefined; + switch (entity.type) { + case "setup": + details = { + networkContext: { + chainId: BigNumber.from(entity.channel!.chainId).toNumber(), + channelFactoryAddress: entity.channel!.channelFactoryAddress, + transferRegistryAddress: entity.channel!.transferRegistryAddress, + }, + timeout: entity.channel!.timeout, + } as SetupUpdateDetails; + break; + case "deposit": + details = { + totalDepositsAlice: entity.totalDepositsAlice, + totalDepositsBob: entity.totalDepositsBob, + } as DepositUpdateDetails; + break; + case "create": + details = { + balance: { + to: [entity.transferToA!, entity.transferToB!], + amount: [entity.transferAmountA!, entity.transferAmountB!], + }, + merkleRoot: entity.merkleRoot!, + transferDefinition: entity.transferDefinition!, + transferTimeout: entity.transferTimeout!, + transferId: entity.transferId!, + transferEncodings: entity.transferEncodings!.split("$"), + transferInitialState: JSON.parse(entity.transferInitialState!), + meta: entity.meta ? JSON.parse(entity.meta) : undefined, + } as CreateUpdateDetails; + break; + case "resolve": + details = { + merkleRoot: entity.merkleRoot!, + transferDefinition: entity.transferDefinition!, + transferId: entity.transferId!, + transferResolver: JSON.parse(entity.transferResolver!), + meta: entity.meta ? JSON.parse(entity.meta) : undefined, + } as ResolveUpdateDetails; + break; + } + return { + id: { + id: entity.id!, + signature: entity.idSignature!, + }, + assetId: entity.assetId, + balance: { + amount: [entity.amountA, entity.amountB], + to: [entity.toA, entity.toB], + }, + channelAddress: entity.channelAddressId, + details, + fromIdentifier: entity.fromIdentifier, + nonce: entity.nonce, + aliceSignature: entity.signatureA ?? undefined, + bobSignature: entity.signatureB ?? undefined, + toIdentifier: entity.toIdentifier, + type: entity.type as keyof typeof UpdateType, + }; +}; + const convertChannelEntityToFullChannelState = ( channelEntity: Channel & { balances: BalanceEntity[]; @@ -119,51 +185,9 @@ const convertChannelEntityToFullChannelState = ( }); // convert db representation into details for the particular update - let details: SetupUpdateDetails | DepositUpdateDetails | CreateUpdateDetails | ResolveUpdateDetails | undefined; - if (channelEntity.latestUpdate) { - switch (channelEntity.latestUpdate.type) { - case "setup": - details = { - networkContext: { - chainId: BigNumber.from(channelEntity.chainId).toNumber(), - channelFactoryAddress: channelEntity.channelFactoryAddress, - transferRegistryAddress: channelEntity.transferRegistryAddress, - }, - timeout: channelEntity.timeout, - } as SetupUpdateDetails; - break; - case "deposit": - details = { - totalDepositsAlice: channelEntity.latestUpdate.totalDepositsAlice, - totalDepositsBob: channelEntity.latestUpdate.totalDepositsBob, - } as DepositUpdateDetails; - break; - case "create": - details = { - balance: { - to: [channelEntity.latestUpdate.transferToA!, channelEntity.latestUpdate.transferToB!], - amount: [channelEntity.latestUpdate.transferAmountA!, channelEntity.latestUpdate.transferAmountB!], - }, - merkleRoot: channelEntity.latestUpdate.merkleRoot!, - transferDefinition: channelEntity.latestUpdate.transferDefinition!, - transferTimeout: channelEntity.latestUpdate.transferTimeout!, - transferId: channelEntity.latestUpdate.transferId!, - transferEncodings: channelEntity.latestUpdate.transferEncodings!.split("$"), - transferInitialState: JSON.parse(channelEntity.latestUpdate.transferInitialState!), - meta: channelEntity.latestUpdate!.meta ? JSON.parse(channelEntity.latestUpdate!.meta) : undefined, - } as CreateUpdateDetails; - break; - case "resolve": - details = { - merkleRoot: channelEntity.latestUpdate.merkleRoot!, - transferDefinition: channelEntity.latestUpdate.transferDefinition!, - transferId: channelEntity.latestUpdate.transferId!, - transferResolver: JSON.parse(channelEntity.latestUpdate.transferResolver!), - meta: channelEntity.latestUpdate!.meta ? JSON.parse(channelEntity.latestUpdate!.meta) : undefined, - } as ResolveUpdateDetails; - break; - } - } + const latestUpdate = !!channelEntity.latestUpdate + ? convertUpdateEntityToChannelUpdate({ ...channelEntity.latestUpdate, channel: channelEntity }) + : undefined; const channel: FullChannelState = { assetIds, @@ -184,21 +208,7 @@ const convertChannelEntityToFullChannelState = ( bob: channelEntity.participantB, bobIdentifier: channelEntity.publicIdentifierB, timeout: channelEntity.timeout, - latestUpdate: { - assetId: channelEntity.latestUpdate!.assetId, - balance: { - amount: [channelEntity.latestUpdate!.amountA, channelEntity.latestUpdate!.amountB], - to: [channelEntity.latestUpdate!.toA, channelEntity.latestUpdate!.toB], - }, - channelAddress: channelEntity.channelAddress, - details, - fromIdentifier: channelEntity.latestUpdate!.fromIdentifier, - nonce: channelEntity.latestUpdate!.nonce, - aliceSignature: channelEntity.latestUpdate!.signatureA ?? undefined, - bobSignature: channelEntity.latestUpdate!.signatureB ?? undefined, - toIdentifier: channelEntity.latestUpdate!.toIdentifier, - type: channelEntity.latestUpdate!.type as "create" | "deposit" | "resolve" | "setup", - }, + latestUpdate: latestUpdate as any, inDispute: !!channelEntity.dispute, }; return channel; @@ -641,6 +651,14 @@ export class PrismaStore implements IServerNodeStore { await this.prisma.$disconnect(); } + async getUpdateById(id: string): Promise { + const entity = await this.prisma.update.findUnique({ where: { id }, include: { channel: true } }); + if (!entity) { + return undefined; + } + return convertUpdateEntityToChannelUpdate(entity); + } + async getChannelState(channelAddress: string): Promise { const channelEntity = await this.prisma.channel.findUnique({ where: { channelAddress }, @@ -831,6 +849,8 @@ export class PrismaStore implements IServerNodeStore { : undefined, }, create: { + id: channelState.latestUpdate.id.id, + idSignature: channelState.latestUpdate.id.signature, channelAddressId: channelState.channelAddress, channel: { connect: { channelAddress: channelState.channelAddress } }, fromIdentifier: channelState.latestUpdate.fromIdentifier, @@ -939,6 +959,8 @@ export class PrismaStore implements IServerNodeStore { let latestUpdateModel: Prisma.UpdateCreateInput | undefined; if (channel.latestUpdate) { latestUpdateModel = { + id: channel.latestUpdate.id.id, + idSignature: channel.latestUpdate.id.signature, channelAddressId: channel.channelAddress, fromIdentifier: channel.latestUpdate!.fromIdentifier, toIdentifier: channel.latestUpdate!.toIdentifier, diff --git a/modules/test-runner/src/load/helpers/agent.ts b/modules/test-runner/src/load/helpers/agent.ts index 954602cef..8256847ed 100644 --- a/modules/test-runner/src/load/helpers/agent.ts +++ b/modules/test-runner/src/load/helpers/agent.ts @@ -12,7 +12,6 @@ import { BigNumber, constants, Contract, providers, Wallet, utils } from "ethers import { formatEther, parseUnits } from "ethers/lib/utils"; import { Evt } from "evt"; import PriorityQueue from "p-queue"; -import { jsonifyError } from "../../../../types/dist/src"; import { env, getRandomIndex } from "../../utils"; @@ -24,7 +23,7 @@ const provider = new providers.JsonRpcProvider(env.chainProviders[chainId]); const wallet = Wallet.fromMnemonic(env.sugarDaddy).connect(provider); const transferAmount = "1"; //utils.parseEther("0.00001").toString(); const agentBalance = utils.parseEther("0.0005").toString(); -const routerBalance = utils.parseEther("0.15"); +const routerBalance = utils.parseEther("0.3"); const walletQueue = new PriorityQueue({ concurrency: 1 }); @@ -509,7 +508,8 @@ export class AgentManager { this.transferInfo[routingId].end = Date.now(); // If it was cancelled, mark as failure - if (Object.values(data.transfer.transferResolver)[0] === constants.HashZero) { + const cancelled = Object.values(data.transfer.transferResolver)[0] === constants.HashZero; + if (cancelled) { logger.warn( { transferId: transfer.transferId, @@ -531,7 +531,7 @@ export class AgentManager { } // Only create a new transfer IFF you resolved it - if (agent.signerAddress === transfer.initiator) { + if (agent.signerAddress === transfer.initiator && !cancelled) { logger.debug( { transfer: transfer.transferId, diff --git a/modules/types/src/channel.ts b/modules/types/src/channel.ts index d6bac0d34..155d6123c 100644 --- a/modules/types/src/channel.ts +++ b/modules/types/src/channel.ts @@ -61,11 +61,30 @@ export interface UpdateParamsMap { [UpdateType.setup]: SetupParams; } +// When generating an update from params, you need to create an +// identifier to make sure the update remains idempotent. Imagine +// without this and you are trying to apply a `create` update. +// In this case, there is no way to know whether or not you have +// already created the transfer (the `transferId` is not generated +// until you know the nonce the proposed update is executed at). +// This leads to an edgecase where a transfer is created by someone +// who does not hold priority, and installed by the responder. The +// responder then inserts their own update (thereby cancelling yours) +// and you reinsert your "create" update into the queue (causing the +// same transfer to be created 2x). You sign the update identifier so +// you dont run into this problem again when syncing an update and the +// id has been tampered with. +export type UpdateIdentifier = { + id: string; + signature: string; +}; + // Protocol update export type UpdateParams = { channelAddress: string; type: T; details: UpdateParamsMap[T]; + id: UpdateIdentifier; }; export type Balance = { @@ -172,6 +191,7 @@ export type NetworkContext = ContractAddresses & { }; export type ChannelUpdate = { + id: UpdateIdentifier; // signed by update.fromIdentifier channelAddress: string; fromIdentifier: string; toIdentifier: string; diff --git a/modules/types/src/store.ts b/modules/types/src/store.ts index 443629fcb..68509adc7 100644 --- a/modules/types/src/store.ts +++ b/modules/types/src/store.ts @@ -1,7 +1,7 @@ import { TransactionReceipt, TransactionResponse } from "@ethersproject/abstract-provider"; import { WithdrawCommitmentJson } from "./transferDefinitions/withdraw"; -import { FullTransferState, FullChannelState } from "./channel"; +import { FullTransferState, FullChannelState, ChannelUpdate } from "./channel"; import { Address } from "./basic"; import { ChannelDispute, TransferDispute } from "./dispute"; import { GetTransfersFilterOpts } from "./schemas/engine"; @@ -28,6 +28,7 @@ export interface IVectorStore { getActiveTransfers(channelAddress: string): Promise; getTransferState(transferId: string): Promise; getTransfers(filterOpts?: GetTransfersFilterOpts): Promise; + getUpdateById(id: string): Promise; // Setters saveChannelState(channelState: FullChannelState, transfer?: FullTransferState): Promise; diff --git a/modules/utils/package.json b/modules/utils/package.json index 737ecc450..ff5e39377 100644 --- a/modules/utils/package.json +++ b/modules/utils/package.json @@ -44,7 +44,8 @@ "merkletreejs": "0.2.18", "pino": "6.11.1", "pino-pretty": "4.6.0", - "ts-natsutil": "1.1.1" + "ts-natsutil": "1.1.1", + "uuid": "8.3.2" }, "devDependencies": { "@babel/polyfill": "7.12.1", diff --git a/modules/utils/src/test/channel.ts b/modules/utils/src/test/channel.ts index c5f56cbb2..7da0d9c96 100644 --- a/modules/utils/src/test/channel.ts +++ b/modules/utils/src/test/channel.ts @@ -15,6 +15,7 @@ import { FullTransferState, DEFAULT_TRANSFER_TIMEOUT, } from "@connext/vector-types"; +import { v4 as uuidV4 } from "uuid"; import { ChannelSigner } from "../channelSigner"; @@ -44,6 +45,11 @@ export function createTestUpdateParams( const base = { channelAddress: overrides.channelAddress ?? mkAddress("0xccc"), type, + id: { + id: uuidV4(), + signature: mkSig("0xcceeffaa6655"), + ...(overrides.id ?? {}), + }, }; let details: any; @@ -117,6 +123,10 @@ export function createTestChannelUpdate( bobSignature: mkSig("0x0002"), toIdentifier: mkPublicIdentifier("vectorB"), type, + id: { + id: uuidV4(), + signature: mkSig("0x00003"), + }, }; // Get details from overrides diff --git a/modules/utils/src/test/services/store.ts b/modules/utils/src/test/services/store.ts index 0b96e0d8f..0c01bece1 100644 --- a/modules/utils/src/test/services/store.ts +++ b/modules/utils/src/test/services/store.ts @@ -11,6 +11,7 @@ import { GetTransfersFilterOpts, CoreChannelState, CoreTransferState, + ChannelUpdate, } from "@connext/vector-types"; import { TransactionReceipt, TransactionResponse } from "@ethersproject/abstract-provider"; @@ -97,6 +98,7 @@ export class MemoryStoreService implements IEngineStore { // Map private channelStates: Map = new Map(); + private updates: Map = new Map(); private schemaVersion: number | undefined = undefined; @@ -118,6 +120,10 @@ export class MemoryStoreService implements IEngineStore { return Promise.resolve(); } + getUpdateById(id: string): Promise { + return Promise.resolve(this.updates.get(id)); + } + getChannelState(channelAddress: string): Promise { const state = this.channelStates.get(channelAddress); return Promise.resolve(state); @@ -142,6 +148,9 @@ export class MemoryStoreService implements IEngineStore { } saveChannelState(channelState: FullChannelState, transfer?: FullTransferState): Promise { + if (channelState.latestUpdate) { + this.updates.set(channelState.latestUpdate.id.id, channelState.latestUpdate); + } this.channelStates.set(channelState.channelAddress, { ...channelState, });