diff --git a/modules/browser-node/src/index.ts b/modules/browser-node/src/index.ts index 8cc9be69f..1c9f754ec 100644 --- a/modules/browser-node/src/index.ts +++ b/modules/browser-node/src/index.ts @@ -24,7 +24,6 @@ import { constructRpcRequest, hydrateProviders, NatsMessagingService } from "@co import pino, { BaseLogger } from "pino"; import { BrowserStore } from "./services/store"; -import { BrowserLockService } from "./services/lock"; import { DirectProvider, IframeChannelProvider, IRpcChannelProvider } from "./channelProvider"; import { BrowserNodeError } from "./errors"; export * from "./constants"; @@ -108,11 +107,6 @@ export class BrowserNode implements INodeService { config.signer.publicIdentifier, config.logger.child({ module: "BrowserStore" }), ); - const lock = new BrowserLockService( - config.signer.publicIdentifier, - messaging, - config.logger.child({ module: "BrowserLockService" }), - ); const chainService = new VectorChainService( store, chainJsonProviders, @@ -146,7 +140,6 @@ export class BrowserNode implements INodeService { const engine = await VectorEngine.connect( messaging, - lock, store, config.signer, chainService, diff --git a/modules/browser-node/src/services/lock.ts b/modules/browser-node/src/services/lock.ts deleted file mode 100644 index 7d1698e27..000000000 --- a/modules/browser-node/src/services/lock.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { ILockService, IMessagingService, Result, jsonifyError } from "@connext/vector-types"; -import { BaseLogger } from "pino"; - -import { BrowserNodeLockError } from "../errors"; - -export class BrowserLockService implements ILockService { - constructor( - private readonly publicIdentifier: string, - private readonly messagingService: IMessagingService, - private readonly log: BaseLogger, - ) {} - - async acquireLock(lockName: string, isAlice?: boolean, counterpartyPublicIdentifier?: string): Promise { - if (!counterpartyPublicIdentifier) { - throw new BrowserNodeLockError(BrowserNodeLockError.reasons.CounterpartyIdentifierMissing, lockName); - } - if (isAlice) { - throw new BrowserNodeLockError(BrowserNodeLockError.reasons.CannotBeAlice, lockName); - } - - const res = await this.messagingService.sendLockMessage( - Result.ok({ type: "acquire", lockName }), - counterpartyPublicIdentifier!, - this.publicIdentifier, - ); - if (res.isError) { - throw new BrowserNodeLockError(BrowserNodeLockError.reasons.AcquireMessageFailed, lockName, "", { - error: jsonifyError(res.getError()!), - }); - } - const { lockValue } = res.getValue(); - if (!lockValue) { - throw new BrowserNodeLockError(BrowserNodeLockError.reasons.SentMessageAcquisitionFailed, lockName); - } - this.log.debug({ method: "acquireLock", lockName, lockValue }, "Acquired lock"); - return lockValue; - } - - async releaseLock( - lockName: string, - lockValue: string, - isAlice?: boolean, - counterpartyPublicIdentifier?: string, - ): Promise { - if (!counterpartyPublicIdentifier) { - throw new BrowserNodeLockError(BrowserNodeLockError.reasons.CounterpartyIdentifierMissing, lockName, lockValue); - } - if (isAlice) { - throw new BrowserNodeLockError(BrowserNodeLockError.reasons.CannotBeAlice, lockName, lockValue); - } - - const result = await this.messagingService.sendLockMessage( - Result.ok({ type: "release", lockName, lockValue }), - counterpartyPublicIdentifier!, - this.publicIdentifier, - ); - if (result.isError) { - throw new BrowserNodeLockError(BrowserNodeLockError.reasons.ReleaseMessageFailed, lockName, "", { - error: jsonifyError(result.getError()!), - }); - } - this.log.debug({ method: "releaseLock", lockName, lockValue }, "Released lock"); - } -} diff --git a/modules/engine/src/index.ts b/modules/engine/src/index.ts index 0cbf3fa56..c3db236e5 100644 --- a/modules/engine/src/index.ts +++ b/modules/engine/src/index.ts @@ -2,7 +2,6 @@ import { Vector } from "@connext/vector-protocol"; import { ChainAddresses, IChannelSigner, - ILockService, IMessagingService, IVectorProtocol, Result, @@ -19,7 +18,6 @@ import { ChannelRpcMethods, IExternalValidation, AUTODEPLOY_CHAIN_IDS, - FullChannelState, EngineError, UpdateType, Values, @@ -74,13 +72,11 @@ export class VectorEngine implements IVectorEngine { private readonly vector: IVectorProtocol, private readonly chainService: IVectorChainService, private readonly chainAddresses: ChainAddresses, - private readonly lockService: ILockService, private readonly logger: pino.BaseLogger, ) {} static async connect( messaging: IMessagingService, - lock: ILockService, store: IEngineStore, signer: IChannelSigner, chainService: IVectorChainService, @@ -92,7 +88,6 @@ export class VectorEngine implements IVectorEngine { ): Promise { const vector = await Vector.connect( messaging, - lock, store, signer, chainService, @@ -107,7 +102,6 @@ export class VectorEngine implements IVectorEngine { vector, chainService, chainAddresses, - lock, logger.child({ module: "VectorEngine" }), ); await engine.setupListener(gasSubsidyPercentage); @@ -140,59 +134,10 @@ export class VectorEngine implements IVectorEngine { this.chainAddresses, this.logger, this.setup.bind(this), - this.acquireRestoreLocks.bind(this), - this.releaseRestoreLocks.bind(this), gasSubsidyPercentage, ); } - private async acquireRestoreLocks(channel: FullChannelState): Promise> { - if (this.restoreLocks[channel.channelAddress]) { - // Has already been released, return undefined - return Result.ok(this.restoreLocks[channel.channelAddress]); - } - try { - const isAlice = channel.alice === this.signer.address; - const lockVal = await this.lockService.acquireLock( - channel.channelAddress, - isAlice, - isAlice ? channel.bobIdentifier : channel.aliceIdentifier, - ); - this.restoreLocks[channel.channelAddress] = lockVal; - return Result.ok(undefined); - } catch (e) { - return Result.fail( - new RestoreError(RestoreError.reasons.AcquireLockError, channel.channelAddress, this.signer.publicIdentifier, { - acquireRestoreLockError: e.message, - }), - ); - } - } - - private async releaseRestoreLocks(channel: FullChannelState): Promise> { - if (!this.restoreLocks[channel.channelAddress]) { - // Has already been released, return undefined - return Result.ok(undefined); - } - try { - const isAlice = channel.alice === this.signer.address; - await this.lockService.releaseLock( - channel.channelAddress, - this.restoreLocks[channel.channelAddress], - isAlice, - isAlice ? channel.bobIdentifier : channel.aliceIdentifier, - ); - delete this.restoreLocks[channel.channelAddress]; - return Result.ok(undefined); - } catch (e) { - return Result.fail( - new RestoreError(RestoreError.reasons.ReleaseLockError, channel.channelAddress, this.signer.publicIdentifier, { - releaseRestoreLockError: e.message, - }), - ); - } - } - private async getConfig(): Promise< Result > { diff --git a/modules/engine/src/listeners.ts b/modules/engine/src/listeners.ts index 6b10e94a3..5f2a84520 100644 --- a/modules/engine/src/listeners.ts +++ b/modules/engine/src/listeners.ts @@ -60,8 +60,6 @@ export async function setupEngineListeners( setup: ( params: EngineParams.Setup, ) => Promise>, - acquireRestoreLocks: (channel: FullChannelState) => Promise>, - releaseRestoreLocks: (channel: FullChannelState) => Promise>, gasSubsidyPercentage: number, ): Promise { // Set up listener for channel setup @@ -175,107 +173,7 @@ export async function setupEngineListeners( const method = "onReceiveRestoreStateMessage"; logger.debug({ method }, "Handling message"); - // releases the lock, and acks to senders confirmation message - const releaseLockAndAck = async (channelAddress: string, postToEvt = false) => { - const channel = await store.getChannelState(channelAddress); - if (!channel) { - logger.error({ channelAddress }, "Failed to find channel to release lock"); - return; - } - await releaseRestoreLocks(channel); - await messaging.respondToRestoreStateMessage(inbox, Result.ok(undefined)); - if (postToEvt) { - // Post to evt - evts[EngineEvents.RESTORE_STATE_EVENT].post({ - channelAddress: channel.channelAddress, - aliceIdentifier: channel.aliceIdentifier, - bobIdentifier: channel.bobIdentifier, - chainId: channel.networkContext.chainId, - }); - } - return; - }; - - // Received error from counterparty - if (restoreData.isError) { - // releasing the lock should be done regardless of error - logger.error({ message: restoreData.getError()!.message, method }, "Error received from counterparty restore"); - await releaseLockAndAck(restoreData.getError()!.context.channelAddress); - return; - } - - const data = restoreData.getValue(); - const [key] = Object.keys(data ?? []); - if (key !== "chainId" && key !== "channelAddress") { - logger.error({ data }, "Message malformed"); - return; - } - - if (key === "channelAddress") { - const { channelAddress } = data as { channelAddress: string }; - await releaseLockAndAck(channelAddress, true); - return; - } - - // Otherwise, they are looking to initiate a sync - let channel: FullChannelState | undefined; - const sendCannotRestoreFromError = (error: Values, context: any = {}) => { - return messaging.respondToRestoreStateMessage( - inbox, - Result.fail( - new RestoreError(error, channel?.channelAddress ?? "", signer.publicIdentifier, { ...context, method }), - ), - ); - }; - - // Get info from store to send to counterparty - const { chainId } = data as any; - try { - channel = await store.getChannelStateByParticipants(signer.publicIdentifier, from, chainId); - } catch (e) { - return sendCannotRestoreFromError(RestoreError.reasons.CouldNotGetChannel, { - storeMethod: "getChannelStateByParticipants", - chainId, - identifiers: [signer.publicIdentifier, from], - }); - } - if (!channel) { - return sendCannotRestoreFromError(RestoreError.reasons.ChannelNotFound, { chainId }); - } - let activeTransfers: FullTransferState[]; - try { - activeTransfers = await store.getActiveTransfers(channel.channelAddress); - } catch (e) { - return sendCannotRestoreFromError(RestoreError.reasons.CouldNotGetActiveTransfers, { - storeMethod: "getActiveTransfers", - chainId, - channelAddress: channel.channelAddress, - }); - } - - // Acquire lock - const res = await acquireRestoreLocks(channel); - if (res.isError) { - return sendCannotRestoreFromError(RestoreError.reasons.AcquireLockError, { - acquireLockError: jsonifyError(res.getError()!), - }); - } - - // Send info to counterparty - logger.debug( - { - channel: channel.channelAddress, - nonce: channel.nonce, - activeTransfers: activeTransfers.map((a) => a.transferId), - }, - "Sending counterparty state to sync", - ); - await messaging.respondToRestoreStateMessage(inbox, Result.ok({ channel, activeTransfers })); - - // Release lock on timeout regardless - setTimeout(() => { - releaseRestoreLocks(channel!); - }, 15_000); + throw new Error("call to protocol to add to internal queue"); }, ); diff --git a/modules/engine/src/testing/index.spec.ts b/modules/engine/src/testing/index.spec.ts index ddc741dcb..646f2670d 100644 --- a/modules/engine/src/testing/index.spec.ts +++ b/modules/engine/src/testing/index.spec.ts @@ -6,7 +6,6 @@ import { getTestLoggers, MemoryStoreService, MemoryMessagingService, - MemoryLockService, getRandomBytes32, mkPublicIdentifier, mkAddress, @@ -51,7 +50,6 @@ describe("VectorEngine", () => { it("should connect without validation", async () => { const engine = await VectorEngine.connect( Sinon.createStubInstance(MemoryMessagingService), - Sinon.createStubInstance(MemoryLockService), storeService, getRandomChannelSigner(), chainService as IVectorChainService, @@ -66,7 +64,6 @@ describe("VectorEngine", () => { it("should connect with validation", async () => { const engine = await VectorEngine.connect( Sinon.createStubInstance(MemoryMessagingService), - Sinon.createStubInstance(MemoryLockService), storeService, getRandomChannelSigner(), chainService as IVectorChainService, @@ -156,7 +153,6 @@ describe("VectorEngine", () => { it(test.name, async () => { const engine = await VectorEngine.connect( Sinon.createStubInstance(MemoryMessagingService), - Sinon.createStubInstance(MemoryLockService), storeService, getRandomChannelSigner(), chainService as IVectorChainService, @@ -195,7 +191,6 @@ describe("VectorEngine", () => { it(test.name, async () => { const engine = await VectorEngine.connect( Sinon.createStubInstance(MemoryMessagingService), - Sinon.createStubInstance(MemoryLockService), storeService, getRandomChannelSigner(), chainService as IVectorChainService, @@ -809,7 +804,6 @@ describe("VectorEngine", () => { it(test.name, async () => { const engine = await VectorEngine.connect( Sinon.createStubInstance(MemoryMessagingService), - Sinon.createStubInstance(MemoryLockService), storeService, getRandomChannelSigner(), chainService as IVectorChainService, diff --git a/modules/engine/src/testing/listeners.spec.ts b/modules/engine/src/testing/listeners.spec.ts index 683b0fa09..b7fa7d9e0 100644 --- a/modules/engine/src/testing/listeners.spec.ts +++ b/modules/engine/src/testing/listeners.spec.ts @@ -345,8 +345,6 @@ describe(testName, () => { chainAddresses, log, () => Promise.resolve(Result.ok({} as any)), - acquireRestoreLockStub, - releaseRestoreLockStub, gasSubsidyPercentage, ); @@ -464,8 +462,6 @@ describe(testName, () => { chainAddresses, log, () => Promise.resolve(Result.ok({} as any)), - acquireRestoreLockStub, - releaseRestoreLockStub, 50, ); diff --git a/modules/protocol/src/sync.ts b/modules/protocol/src/sync.ts index 3699111de..4bd930546 100644 --- a/modules/protocol/src/sync.ts +++ b/modules/protocol/src/sync.ts @@ -14,7 +14,7 @@ import { MessagingError, jsonifyError, } from "@connext/vector-types"; -import { getRandomBytes32, LOCK_TTL } from "@connext/vector-utils"; +import { getRandomBytes32 } from "@connext/vector-utils"; import pino from "pino"; import { InboundChannelUpdateError, OutboundChannelUpdateError } from "./errors"; diff --git a/modules/protocol/src/testing/utils/channel.ts b/modules/protocol/src/testing/utils/channel.ts index 14bb2316d..f42d4f75b 100644 --- a/modules/protocol/src/testing/utils/channel.ts +++ b/modules/protocol/src/testing/utils/channel.ts @@ -2,7 +2,6 @@ import { ChannelFactory, TestToken, VectorChannel, VectorChainReader } from "@co import { FullChannelState, IChannelSigner, - ILockService, IMessagingService, IVectorProtocol, IVectorStore, @@ -16,7 +15,6 @@ import { getTestLoggers, expect, MemoryStoreService, - MemoryLockService, MemoryMessagingService, getSignerAddressFromPublicIdentifier, } from "@connext/vector-utils"; @@ -33,7 +31,6 @@ import { fundAddress } from "./funding"; type VectorTestOverrides = { messagingService: IMessagingService; - lockService: ILockService; storeService: IVectorStore; signer: IChannelSigner; chainReader: IVectorChainReader; @@ -43,7 +40,6 @@ type VectorTestOverrides = { // NOTE: when operating with three counterparties, they must // all share a messaging service const sharedMessaging = new MemoryMessagingService(); -const sharedLock = new MemoryLockService(); const sharedChain = new VectorChainReader({ [chainId]: provider }, Pino()); export const createVectorInstances = async ( @@ -57,7 +53,6 @@ export const createVectorInstances = async ( .map(async (_, idx) => { const instanceOverrides = overrides[idx] || {}; const messagingService = shareServices ? sharedMessaging : new MemoryMessagingService(); - const lockService = shareServices ? sharedLock : new MemoryLockService(); const logger = instanceOverrides.logger ?? Pino(); const chainReader = shareServices ? sharedChain @@ -65,7 +60,6 @@ export const createVectorInstances = async ( const opts = { messagingService, - lockService, storeService: new MemoryStoreService(), signer: getRandomChannelSigner(provider), chainReader, diff --git a/modules/protocol/src/testing/vector.spec.ts b/modules/protocol/src/testing/vector.spec.ts index 214977ebc..30a0f4b2a 100644 --- a/modules/protocol/src/testing/vector.spec.ts +++ b/modules/protocol/src/testing/vector.spec.ts @@ -10,12 +10,10 @@ import { MemoryStoreService, expect, MemoryMessagingService, - MemoryLockService, } from "@connext/vector-utils"; import pino from "pino"; import { IVectorChainReader, - ILockService, IMessagingService, IVectorStore, UpdateType, @@ -33,7 +31,6 @@ import { env } from "./env"; describe("Vector", () => { let chainReader: Sinon.SinonStubbedInstance; - let lockService: Sinon.SinonStubbedInstance; let messagingService: Sinon.SinonStubbedInstance; let storeService: Sinon.SinonStubbedInstance; @@ -42,7 +39,6 @@ describe("Vector", () => { chainReader.getChannelFactoryBytecode.resolves(Result.ok(mkHash())); chainReader.getChannelMastercopyAddress.resolves(Result.ok(mkAddress())); chainReader.getChainProviders.returns(Result.ok(env.chainProviders)); - lockService = Sinon.createStubInstance(MemoryLockService); messagingService = Sinon.createStubInstance(MemoryMessagingService); storeService = Sinon.createStubInstance(MemoryStoreService); storeService.getChannelStates.resolves([]); @@ -61,7 +57,6 @@ describe("Vector", () => { const signer = getRandomChannelSigner(); const node = await Vector.connect( messagingService, - lockService, storeService, signer, chainReader as IVectorChainReader, @@ -97,7 +92,6 @@ describe("Vector", () => { chainReader.registerChannel.resolves(Result.ok(undefined)); vector = await Vector.connect( messagingService, - lockService, storeService, signer, chainReader as IVectorChainReader, @@ -112,8 +106,6 @@ describe("Vector", () => { }); const result = await vector.setup(details); expect(result.getError()).to.be.undefined; - expect(lockService.acquireLock.callCount).to.be.eq(1); - expect(lockService.releaseLock.callCount).to.be.eq(1); }); it("should fail if it fails to generate the create2 address", async () => { @@ -224,7 +216,6 @@ describe("Vector", () => { vector = await Vector.connect( messagingService, - lockService, storeService, signer, chainReader as IVectorChainReader, @@ -237,8 +228,6 @@ describe("Vector", () => { const { details } = createTestUpdateParams(UpdateType.deposit, { channelAddress }); const result = await vector.deposit({ ...details, channelAddress }); expect(result.getError()).to.be.undefined; - expect(lockService.acquireLock.callCount).to.be.eq(1); - expect(lockService.releaseLock.callCount).to.be.eq(1); }); describe("should validate parameters", () => { @@ -294,7 +283,6 @@ describe("Vector", () => { vector = await Vector.connect( messagingService, - lockService, storeService, signer, chainReader as IVectorChainReader, @@ -307,8 +295,6 @@ describe("Vector", () => { const { details } = createTestUpdateParams(UpdateType.create, { channelAddress }); const result = await vector.create({ ...details, channelAddress }); expect(result.getError()).to.be.undefined; - expect(lockService.acquireLock.callCount).to.be.eq(1); - expect(lockService.releaseLock.callCount).to.be.eq(1); }); describe("should validate parameters", () => { @@ -402,7 +388,6 @@ describe("Vector", () => { vector = await Vector.connect( messagingService, - lockService, storeService, signer, chainReader as IVectorChainReader, @@ -415,8 +400,6 @@ describe("Vector", () => { const { details } = createTestUpdateParams(UpdateType.resolve, { channelAddress }); const result = await vector.resolve({ ...details, channelAddress }); expect(result.getError()).to.be.undefined; - expect(lockService.acquireLock.callCount).to.be.eq(1); - expect(lockService.releaseLock.callCount).to.be.eq(1); }); describe("should validate parameters", () => { diff --git a/modules/protocol/src/vector.ts b/modules/protocol/src/vector.ts index 7667d1f2c..66370f4e4 100644 --- a/modules/protocol/src/vector.ts +++ b/modules/protocol/src/vector.ts @@ -1,4 +1,3 @@ -import { ChannelMastercopy } from "@connext/vector-contracts"; import { ChannelUpdate, ChannelUpdateEvent, @@ -6,7 +5,6 @@ import { FullTransferState, IChannelSigner, IExternalValidation, - ILockService, IMessagingService, IVectorChainReader, IVectorProtocol, @@ -20,7 +18,6 @@ import { TChannelUpdate, ProtocolError, jsonifyError, - ChainReaderEvents, } from "@connext/vector-types"; import { getCreate2MultisigAddress, getRandomBytes32 } from "@connext/vector-utils"; import { Evt } from "evt"; @@ -40,7 +37,6 @@ export class Vector implements IVectorProtocol { // make it private so the only way to create the class is to use `connect` private constructor( private readonly messagingService: IMessagingService, - private readonly lockService: ILockService, private readonly storeService: IVectorStore, private readonly signer: IChannelSigner, private readonly chainReader: IVectorChainReader, @@ -51,7 +47,6 @@ export class Vector implements IVectorProtocol { static async connect( messagingService: IMessagingService, - lockService: ILockService, storeService: IVectorStore, signer: IChannelSigner, chainReader: IVectorChainReader, @@ -75,7 +70,6 @@ export class Vector implements IVectorProtocol { // channel is `setup` plus is not in dispute const node = await new Vector( messagingService, - lockService, storeService, signer, chainReader, @@ -158,27 +152,8 @@ export class Vector implements IVectorProtocol { } const isAlice = this.publicIdentifier === aliceIdentifier; const counterpartyIdentifier = isAlice ? bobIdentifier : aliceIdentifier; - let key: string; - try { - key = await this.lockService.acquireLock(params.channelAddress, isAlice, counterpartyIdentifier); - } catch (e) { - return Result.fail( - new OutboundChannelUpdateError(OutboundChannelUpdateError.reasons.AcquireLockFailed, params, channel, { - lockError: e.message, - }), - ); - } + throw new Error("must implement internal queueing"); const outboundRes = await this.lockedOperation(params); - try { - await this.lockService.releaseLock(params.channelAddress, key, isAlice, counterpartyIdentifier); - } catch (e) { - return Result.fail( - new OutboundChannelUpdateError(OutboundChannelUpdateError.reasons.ReleaseLockFailed, params, channel, { - outboundResult: outboundRes.toJson(), - lockError: jsonifyError(e), - }), - ); - } return outboundRes; } diff --git a/modules/server-node/src/helpers/nodes.ts b/modules/server-node/src/helpers/nodes.ts index 0953f5430..4b3edcb66 100644 --- a/modules/server-node/src/helpers/nodes.ts +++ b/modules/server-node/src/helpers/nodes.ts @@ -1,20 +1,15 @@ import { VectorChainService } from "@connext/vector-contracts"; import { VectorEngine } from "@connext/vector-engine"; -import { EngineEvents, ILockService, IVectorChainService, IVectorEngine, IServerNodeStore } from "@connext/vector-types"; +import { EngineEvents, IVectorChainService, IVectorEngine, IServerNodeStore } from "@connext/vector-types"; import { ChannelSigner, NatsMessagingService, logAxiosError } from "@connext/vector-utils"; import Axios from "axios"; import { Wallet } from "@ethersproject/wallet"; import { logger, _providers } from "../index"; import { config } from "../config"; -import { LockService } from "../services/lock"; const ETH_STANDARD_PATH = "m/44'/60'/0'/0"; -export function getLockService(publicIdentifier: string): ILockService | undefined { - return nodes[publicIdentifier]?.lockService; -} - export function getPath(index = 0): string { return `${ETH_STANDARD_PATH}/${(String(index).match(/.{1,9}/gi) || [index]).join("/")}`; } @@ -27,7 +22,6 @@ export let nodes: { [publicIdentifier: string]: { node: IVectorEngine; chainService: IVectorChainService; - lockService: ILockService; index: number; }; } = {}; @@ -66,16 +60,8 @@ export const createNode = async ( await messaging.connect(); logger.info({ method, messagingUrl: config.messagingUrl }, "Connected NatsMessagingService"); - const lockService = await LockService.connect( - signer.publicIdentifier, - messaging, - logger.child({ module: "LockService" }), - ); - logger.info({ method }, "Connected LockService"); - const vectorEngine = await VectorEngine.connect( messaging, - lockService, store, signer, vectorTx, @@ -102,7 +88,7 @@ export const createNode = async ( logger.info({ event, method, publicIdentifier: signer.publicIdentifier, index }, "Set up subscription for event"); } - nodes[signer.publicIdentifier] = { node: vectorEngine, chainService: vectorTx, index, lockService }; + nodes[signer.publicIdentifier] = { node: vectorEngine, chainService: vectorTx, index }; store.setNodeIndex(index, signer.publicIdentifier); return vectorEngine; }; diff --git a/modules/server-node/src/services/lock.ts b/modules/server-node/src/services/lock.ts deleted file mode 100644 index 3c94aa191..000000000 --- a/modules/server-node/src/services/lock.ts +++ /dev/null @@ -1,160 +0,0 @@ -import { - ILockService, - IMessagingService, - LockInformation, - NodeError, - Result, - jsonifyError, -} from "@connext/vector-types"; -import { MemoryLockService } from "@connext/vector-utils"; -import { BaseLogger } from "pino"; - -import { ServerNodeLockError } from "../helpers/errors"; - -export class LockService implements ILockService { - private constructor( - private readonly memoryLockService: MemoryLockService, - private readonly publicIdentifier: string, - private readonly messagingService: IMessagingService, - private readonly log: BaseLogger, - ) {} - - static async connect( - publicIdentifier: string, - messagingService: IMessagingService, - log: BaseLogger, - ): Promise { - const memoryLockService = new MemoryLockService(); - const lock = new LockService(memoryLockService, publicIdentifier, messagingService, log); - await lock.setupPeerListeners(); - return lock; - } - - private async setupPeerListeners(): Promise { - // Alice always hosts the lock service, so only alice will use - // this callback - return this.messagingService.onReceiveLockMessage( - this.publicIdentifier, - async (lockRequest: Result, from: string, inbox: string) => { - if (lockRequest.isError) { - // Handle a lock failure here - this.log.error( - { - method: "onReceiveLockMessage", - error: lockRequest.getError()?.message, - context: lockRequest.getError()?.context, - }, - "Error in lockRequest", - ); - return; - } - const { type, lockName, lockValue } = lockRequest.getValue(); - if (type === "acquire") { - let acqValue; - let method = "acquireLock"; - try { - acqValue = await this.acquireLock(lockName, true); - method = "respondToLockMessage"; - await this.messagingService.respondToLockMessage(inbox, Result.ok({ lockName, lockValue: acqValue, type })); - } catch (e) { - this.log.error( - { - method: "onReceiveLockMessage", - error: e.message, - }, - "Error acquiring lock", - ); - await this.messagingService.respondToLockMessage( - inbox, - Result.fail( - new ServerNodeLockError(ServerNodeLockError.reasons.AcquireLockFailed, lockName, lockValue, { - acqValue, - failingMethod: method, - lockError: e.message, - }), - ), - ); - } - } else if (type === "release") { - let method = "releaseLock"; - try { - await this.releaseLock(lockName, lockValue!, true); - method = "respondToLockMessage"; - await this.messagingService.respondToLockMessage(inbox, Result.ok({ lockName, type })); - } catch (e) { - this.log.error( - { - method: "onReceiveLockMessage", - error: e.message, - }, - "Error releasing lock", - ); - await this.messagingService.respondToLockMessage( - inbox, - Result.fail( - new ServerNodeLockError(ServerNodeLockError.reasons.FailedToReleaseLock, lockName, lockValue, { - failingMethod: method, - releaseError: e.message, - ...(e.context ?? {}), - }), - ), - ); - } - } - }, - ); - } - - public async acquireLock(lockName: string, isAlice = true, counterpartyPublicIdentifier?: string): Promise { - if (isAlice) { - return this.memoryLockService.acquireLock(lockName); - } else { - const res = await this.messagingService.sendLockMessage( - Result.ok({ type: "acquire", lockName }), - counterpartyPublicIdentifier!, - this.publicIdentifier, - ); - if (res.isError) { - throw new ServerNodeLockError(ServerNodeLockError.reasons.AcquireMessageFailed, lockName, undefined, { - counterpartyPublicIdentifier, - isAlice, - messagingError: jsonifyError(res.getError()!), - }); - } - const { lockValue } = res.getValue(); - if (!lockValue) { - throw new ServerNodeLockError(ServerNodeLockError.reasons.SentMessageAcquisitionFailed, lockName, lockValue, { - counterpartyPublicIdentifier, - isAlice, - }); - } - this.log.debug({ method: "acquireLock", lockName, lockValue }, "Acquired lock"); - return lockValue; - } - } - - public async releaseLock( - lockName: string, - lockValue: string, - isAlice = true, - counterpartyPublicIdentifier?: string, - ): Promise { - if (isAlice) { - return this.memoryLockService.releaseLock(lockName, lockValue); - } else { - const result = await this.messagingService.sendLockMessage( - Result.ok({ type: "release", lockName, lockValue }), - counterpartyPublicIdentifier!, - this.publicIdentifier, - ); - if (result.isError) { - throw new ServerNodeLockError(ServerNodeLockError.reasons.ReleaseMessageFailed, lockName, lockValue, { - messagingError: jsonifyError(result.getError()!), - counterpartyPublicIdentifier, - isAlice, - }); - } - this.log.debug({ method: "releaseLock", lockName, lockValue }, "Released lock"); - } - } -} diff --git a/modules/types/src/index.ts b/modules/types/src/index.ts index 5a735b06b..c03222325 100644 --- a/modules/types/src/index.ts +++ b/modules/types/src/index.ts @@ -9,7 +9,6 @@ export * from "./engine"; export * from "./error"; export * from "./event"; export * from "./externalValidation"; -export * from "./lock"; export * from "./messaging"; export * from "./network"; export * from "./node"; diff --git a/modules/types/src/lock.ts b/modules/types/src/lock.ts deleted file mode 100644 index 1a92b74db..000000000 --- a/modules/types/src/lock.ts +++ /dev/null @@ -1,20 +0,0 @@ -export type LockInformation = { - type: "acquire" | "release"; - lockName: string; - lockValue?: string; -}; - -export interface ILockService { - acquireLock( - lockName: string /* Bytes32? */, - isAlice?: boolean, - counterpartyPublicIdentifier?: string, - ): Promise; - - releaseLock( - lockName: string /* Bytes32? */, - lockValue: string, - isAlice?: boolean, - counterpartyPublicIdentifier?: string, - ): Promise; -} diff --git a/modules/types/src/messaging.ts b/modules/types/src/messaging.ts index e4a261cb6..7e606f7d4 100644 --- a/modules/types/src/messaging.ts +++ b/modules/types/src/messaging.ts @@ -1,6 +1,5 @@ import { ChannelUpdate, FullChannelState, FullTransferState } from "./channel"; import { EngineError, NodeError, MessagingError, ProtocolError, Result, RouterError, VectorError } from "./error"; -import { LockInformation } from "./lock"; import { EngineParams, NodeResponses } from "./schemas"; export type CheckInInfo = { channelAddress: string }; @@ -24,19 +23,6 @@ export interface IBasicMessaging { type TransferQuoteRequest = Omit; export interface IMessagingService extends IBasicMessaging { - onReceiveLockMessage( - myPublicIdentifier: string, - callback: (lockInfo: Result, from: string, inbox: string) => void, - ): Promise; - sendLockMessage( - lockInfo: Result, - to: string, - from: string, - timeout?: number, - numRetries?: number, - ): Promise>; - respondToLockMessage(inbox: string, lockInformation: Result): Promise; - onReceiveProtocolMessage( myPublicIdentifier: string, callback: ( diff --git a/modules/utils/src/index.ts b/modules/utils/src/index.ts index cb7705f12..0f84fadc3 100644 --- a/modules/utils/src/index.ts +++ b/modules/utils/src/index.ts @@ -15,7 +15,6 @@ export * from "./fs"; export * from "./hexStrings"; export * from "./identifiers"; export * from "./json"; -export * from "./lock"; export * from "./fees"; export * from "./math"; export * from "./merkle"; diff --git a/modules/utils/src/lock.spec.ts b/modules/utils/src/lock.spec.ts deleted file mode 100644 index cd90d4d1e..000000000 --- a/modules/utils/src/lock.spec.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { MemoryLockService, LOCK_TTL } from "./lock"; - -import { delay, expect } from "./"; - -describe("MemoLock", () => { - describe("with a common lock", () => { - let module: MemoryLockService; - - beforeEach(async () => { - module = new MemoryLockService(); - }); - - it("should not allow locks to simultaneously access resources", async function () { - this.timeout(60_000); - const store = { test: "value" }; - const callback = async (lockName: string, wait: number = LOCK_TTL / 2) => { - await delay(wait); - store.test = lockName; - }; - const lock = await module.acquireLock("foo"); - callback("round1").then(async () => { - await module.releaseLock("foo", lock); - }); - const nextLock = await module.acquireLock("foo"); - expect(nextLock).to.not.eq(lock); - await callback("round2", LOCK_TTL / 4); - await module.releaseLock("foo", nextLock); - expect(store.test).to.be.eq("round2"); - }).timeout(); - - it("should allow locking to occur", async function () { - const lock = await module.acquireLock("foo"); - const start = Date.now(); - setTimeout(() => { - module.releaseLock("foo", lock); - }, 101); - const nextLock = await module.acquireLock("foo"); - expect(Date.now() - start).to.be.at.least(100); - await module.releaseLock("foo", nextLock); - }); - - it("should handle deadlocks", async function () { - this.timeout(60_000); - await module.acquireLock("foo"); - await delay(800); - const lock = await module.acquireLock("foo"); - await module.releaseLock("foo", lock); - }); - - it("should handle concurrent locking", async function () { - this.timeout(60_000); - const start = Date.now(); - const array = [1, 2, 3, 4]; - await Promise.all( - array.map(async (i) => { - const lock = await module.acquireLock("foo"); - await delay(800); - await module.releaseLock("foo", lock); - expect(Date.now() - start).to.be.gte(700 * i); - }), - ); - }); - }); -}); diff --git a/modules/utils/src/lock.ts b/modules/utils/src/lock.ts deleted file mode 100644 index 29bd387e3..000000000 --- a/modules/utils/src/lock.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { randomBytes } from "crypto"; - -import { ILockService } from "@connext/vector-types"; -import { Mutex, MutexInterface } from "async-mutex"; - -type InternalLock = { - lock: Mutex; - releaser: MutexInterface.Releaser; - timer: NodeJS.Timeout; - secret: string; -}; - -export const LOCK_TTL = 30_000; - -export class MemoryLockService implements ILockService { - public readonly locks: Map = new Map(); - private readonly ttl = LOCK_TTL; - - async acquireLock(lockName: string): Promise { - let lock = this.locks.get(lockName)?.lock; - if (!lock) { - lock = new Mutex(); - this.locks.set(lockName, { lock, releaser: undefined, timer: undefined, secret: undefined }); - } - - const releaser = await lock.acquire(); - const secret = this.randomValue(); - const timer = setTimeout(() => this.releaseLock(lockName, secret), this.ttl); - this.locks.set(lockName, { lock, releaser, timer, secret }); - return secret; - } - - async releaseLock(lockName: string, lockValue: string): Promise { - const lock = this.locks.get(lockName); - - if (!lock) { - throw new Error(`Can't release a lock that doesn't exist: ${lockName}`); - } - if (lockValue !== lock.secret) { - throw new Error("Incorrect lock value"); - } - - clearTimeout(lock.timer); - return lock.releaser(); - } - - private randomValue() { - return randomBytes(16).toString("hex"); - } -} diff --git a/modules/utils/src/messaging.ts b/modules/utils/src/messaging.ts index 410524275..19513cf4a 100644 --- a/modules/utils/src/messaging.ts +++ b/modules/utils/src/messaging.ts @@ -3,7 +3,6 @@ import { ChannelUpdate, IMessagingService, NodeError, - LockInformation, Result, EngineParams, FullChannelState, @@ -484,29 +483,6 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I } //////////// - // LOCK METHODS - async sendLockMessage( - lockInfo: Result, - to: string, - from: string, - timeout = 30_000, // TODO this timeout is copied from memolock - numRetries = 0, - ): Promise> { - return this.sendMessageWithRetries(lockInfo, "lock", to, from, timeout, numRetries, "sendLockMessage"); - } - - async onReceiveLockMessage( - publicIdentifier: string, - callback: (lockInfo: Result, from: string, inbox: string) => void, - ): Promise { - return this.registerCallback(`${publicIdentifier}.*.lock`, callback, "onReceiveLockMessage"); - } - - async respondToLockMessage(inbox: string, lockInformation: Result): Promise { - return this.respondToMessage(inbox, lockInformation, "respondToLockMessage"); - } - //////////// - // ISALIVE METHODS sendIsAliveMessage( isAlive: Result<{ channelAddress: string; skipCheckIn?: boolean }, VectorError>, diff --git a/modules/utils/src/test/services/index.ts b/modules/utils/src/test/services/index.ts index c28a3856f..699af3dee 100644 --- a/modules/utils/src/test/services/index.ts +++ b/modules/utils/src/test/services/index.ts @@ -1,3 +1,2 @@ -export * from "../../lock"; export * from "./messaging"; export * from "./store"; diff --git a/modules/utils/src/test/services/messaging.ts b/modules/utils/src/test/services/messaging.ts index cc09470a9..5e724f19f 100644 --- a/modules/utils/src/test/services/messaging.ts +++ b/modules/utils/src/test/services/messaging.ts @@ -2,7 +2,6 @@ import { ChannelUpdate, IMessagingService, NodeError, - LockInformation, MessagingError, Result, FullChannelState, @@ -184,25 +183,6 @@ export class MemoryMessagingService implements IMessagingService { throw new Error("Method not implemented."); } - respondToLockMessage(inbox: string, lockInformation: Result): Promise { - throw new Error("Method not implemented."); - } - onReceiveLockMessage( - myPublicIdentifier: string, - callback: (lockInfo: Result, from: string, inbox: string) => void, - ): Promise { - throw new Error("Method not implemented."); - } - sendLockMessage( - lockInfo: Result, - to: string, - from: string, - timeout?: number, - numRetries?: number, - ): Promise> { - throw new Error("Method not implemented."); - } - sendIsAliveMessage( isAlive: Result<{ channelAddress: string }, VectorError>, to: string,