diff --git a/packages/libp2p/src/circuit-relay/transport/listener.ts b/packages/libp2p/src/circuit-relay/transport/listener.ts index 29b6c6e608..d7718c7698 100644 --- a/packages/libp2p/src/circuit-relay/transport/listener.ts +++ b/packages/libp2p/src/circuit-relay/transport/listener.ts @@ -2,10 +2,8 @@ import { CodeError } from '@libp2p/interface/errors' import { TypedEventEmitter } from '@libp2p/interface/events' import { logger } from '@libp2p/logger' import { PeerMap } from '@libp2p/peer-collections' -import { peerIdFromString } from '@libp2p/peer-id' import { multiaddr } from '@multiformats/multiaddr' import type { ReservationStore } from './reservation-store.js' -import type { Connection } from '@libp2p/interface/connection' import type { PeerId } from '@libp2p/interface/peer-id' import type { Listener, ListenerEvents } from '@libp2p/interface/transport' import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' @@ -41,25 +39,9 @@ class CircuitRelayTransportListener extends TypedEventEmitter im async listen (addr: Multiaddr): Promise { log('listen on %a', addr) - const relayPeerStr = addr.getPeerId() - let relayConn: Connection | undefined - - // check if we already have a connection to the relay - if (relayPeerStr != null) { - const relayPeer = peerIdFromString(relayPeerStr) - const connections = this.connectionManager.getConnectionsMap().get(relayPeer) ?? [] - - if (connections.length > 0) { - relayConn = connections[0] - } - } - - // open a new connection as we don't already have one - if (relayConn == null) { - const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '') - const ma = multiaddr(addrString) - relayConn = await this.connectionManager.openConnection(ma) - } + // remove the circuit part to get the peer id of the relay + const relayAddr = addr.decapsulate('/p2p-circuit') + const relayConn = await this.connectionManager.openConnection(relayAddr) if (!this.relayStore.hasReservation(relayConn.remotePeer)) { // addRelay calls transportManager.listen which calls this listen method diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 232d2e39e1..280e677c02 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -1,6 +1,7 @@ import { AbortError, CodeError } from '@libp2p/interface/errors' import { setMaxListeners } from '@libp2p/interface/events' import { logger } from '@libp2p/logger' +import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' import { type Multiaddr, type Resolver, resolvers } from '@multiformats/multiaddr' import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' @@ -35,6 +36,7 @@ export interface PendingDialTarget { export interface DialOptions extends AbortOptions { priority?: number + force?: boolean } interface PendingDialInternal extends PendingDial { @@ -48,6 +50,7 @@ interface DialerInit { maxParallelDialsPerPeer?: number dialTimeout?: number resolvers?: Record + connections?: PeerMap } const defaultOptions = { @@ -83,12 +86,14 @@ export class DialQueue { private readonly inProgressDialCount?: Metric private readonly pendingDialCount?: Metric private readonly shutDownController: AbortController + private readonly connections: PeerMap constructor (components: DialQueueComponents, init: DialerInit = {}) { this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout + this.connections = init.connections ?? new PeerMap() this.peerId = components.peerId this.peerStore = components.peerStore @@ -187,6 +192,23 @@ export class DialQueue { throw err } + // make sure we don't have an existing connection to any of the addresses we + // are about to dial + let existingConnection = Array.from(this.connections.values()).flat().find(conn => { + if (options.force === true) { + return false + } + + return addrsToDial.find(addr => { + return addr.multiaddr.equals(conn.remoteAddr) + }) + }) + + if (existingConnection != null) { + log('already connected to %a', existingConnection.remoteAddr) + return existingConnection + } + // ready to dial, all async work finished - make sure we don't have any // pending dials in progress for this peer or set of multiaddrs const existingDial = this.pendingDials.find(dial => { @@ -257,7 +279,28 @@ export class DialQueue { // let other dials join this one this.pendingDials.push(pendingDial) - return pendingDial.promise + const connection = await pendingDial.promise + + // we may have been dialing a multiaddr without a peer id attached but by + // this point we have upgraded the connection so the remote peer information + // should be available - check again that we don't already have a connection + // to the remote multiaddr + existingConnection = Array.from(this.connections.values()).flat().find(conn => { + if (options.force === true) { + return false + } + + return conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr) + }) + + if (existingConnection != null) { + log('already connected to %a', existingConnection.remoteAddr) + await connection.close() + return existingConnection + } + + log('connection opened to %a', connection.remoteAddr) + return connection } private createDialAbortControllers (userSignal?: AbortSignal): ClearableSignal { diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index c13e49a53a..0afceac860 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -261,7 +261,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT, resolvers: init.resolvers ?? { dnsaddr: dnsaddrResolver - } + }, + connections: this.connections }) } @@ -505,12 +506,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { if (peerId != null && options.force !== true) { log('dial %p', peerId) - const existingConnections = this.getConnections(peerId) + const existingConnection = this.getConnections(peerId) + .find(conn => !conn.transient) - if (existingConnections.length > 0) { - log('had an existing connection to %p', peerId) + if (existingConnection != null) { + log('had an existing non-transient connection to %p', peerId) - return existingConnections[0] + return existingConnection } } diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index ee31843332..4cde008591 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -536,4 +536,40 @@ describe('Connection Manager', () => { await expect(connectionManager.acceptIncomingConnection(maConn2)) .to.eventually.be.true() }) + + it('should allow dialing peers when an existing transient connection exists', async () => { + connectionManager = new DefaultConnectionManager({ + peerId: await createEd25519PeerId(), + peerStore: stubInterface(), + transportManager: stubInterface(), + connectionGater: stubInterface(), + events: new TypedEventEmitter() + }, { + ...defaultOptions, + maxIncomingPendingConnections: 1 + }) + await connectionManager.start() + + const targetPeer = await createEd25519PeerId() + const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`) + + const existingConnection = stubInterface({ + transient: true + }) + const newConnection = stubInterface() + + sinon.stub(connectionManager.dialQueue, 'dial') + .withArgs(addr) + .resolves(newConnection) + + // we have an existing transient connection + const map = connectionManager.getConnectionsMap() + map.set(targetPeer, [ + existingConnection + ]) + + const conn = await connectionManager.openConnection(addr) + + expect(conn).to.equal(newConnection) + }) })