Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions packages/libp2p/src/circuit-relay/transport/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ import { CodeError } from '@libp2p/interface/errors'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import type { ReservationStore } from './reservation-store.js'
import type { Connection } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Listener, ListenerEvents } from '@libp2p/interface/transport'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
Expand Down Expand Up @@ -41,25 +39,9 @@ class CircuitRelayTransportListener extends TypedEventEmitter<ListenerEvents> im
async listen (addr: Multiaddr): Promise<void> {
log('listen on %a', addr)

const relayPeerStr = addr.getPeerId()
let relayConn: Connection | undefined

// check if we already have a connection to the relay
if (relayPeerStr != null) {
const relayPeer = peerIdFromString(relayPeerStr)
const connections = this.connectionManager.getConnectionsMap().get(relayPeer) ?? []

if (connections.length > 0) {
relayConn = connections[0]
}
}

// open a new connection as we don't already have one
if (relayConn == null) {
const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const ma = multiaddr(addrString)
relayConn = await this.connectionManager.openConnection(ma)
}
// remove the circuit part to get the peer id of the relay
const relayAddr = addr.decapsulate('/p2p-circuit')
const relayConn = await this.connectionManager.openConnection(relayAddr)

if (!this.relayStore.hasReservation(relayConn.remotePeer)) {
// addRelay calls transportManager.listen which calls this listen method
Expand Down
45 changes: 44 additions & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AbortError, CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { type Multiaddr, type Resolver, resolvers } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
Expand Down Expand Up @@ -35,6 +36,7 @@ export interface PendingDialTarget {

export interface DialOptions extends AbortOptions {
priority?: number
force?: boolean
}

interface PendingDialInternal extends PendingDial {
Expand All @@ -48,6 +50,7 @@ interface DialerInit {
maxParallelDialsPerPeer?: number
dialTimeout?: number
resolvers?: Record<string, Resolver>
connections?: PeerMap<Connection[]>
}

const defaultOptions = {
Expand Down Expand Up @@ -83,12 +86,14 @@ export class DialQueue {
private readonly inProgressDialCount?: Metric
private readonly pendingDialCount?: Metric
private readonly shutDownController: AbortController
private readonly connections: PeerMap<Connection[]>

constructor (components: DialQueueComponents, init: DialerInit = {}) {
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
this.connections = init.connections ?? new PeerMap()

this.peerId = components.peerId
this.peerStore = components.peerStore
Expand Down Expand Up @@ -187,6 +192,23 @@ export class DialQueue {
throw err
}

// make sure we don't have an existing connection to any of the addresses we
// are about to dial
let existingConnection = Array.from(this.connections.values()).flat().find(conn => {
if (options.force === true) {
return false
}

return addrsToDial.find(addr => {
return addr.multiaddr.equals(conn.remoteAddr)
})
})

if (existingConnection != null) {
log('already connected to %a', existingConnection.remoteAddr)
return existingConnection
}

// ready to dial, all async work finished - make sure we don't have any
// pending dials in progress for this peer or set of multiaddrs
const existingDial = this.pendingDials.find(dial => {
Expand Down Expand Up @@ -257,7 +279,28 @@ export class DialQueue {
// let other dials join this one
this.pendingDials.push(pendingDial)

return pendingDial.promise
const connection = await pendingDial.promise

// we may have been dialing a multiaddr without a peer id attached but by
// this point we have upgraded the connection so the remote peer information
// should be available - check again that we don't already have a connection
// to the remote multiaddr
existingConnection = Array.from(this.connections.values()).flat().find(conn => {
if (options.force === true) {
return false
}

return conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)
})

if (existingConnection != null) {
log('already connected to %a', existingConnection.remoteAddr)
await connection.close()
return existingConnection
}

log('connection opened to %a', connection.remoteAddr)
return connection
}

private createDialAbortControllers (userSignal?: AbortSignal): ClearableSignal {
Expand Down
12 changes: 7 additions & 5 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
resolvers: init.resolvers ?? {
dnsaddr: dnsaddrResolver
}
},
connections: this.connections
})
}

Expand Down Expand Up @@ -505,12 +506,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

if (peerId != null && options.force !== true) {
log('dial %p', peerId)
const existingConnections = this.getConnections(peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => !conn.transient)

if (existingConnections.length > 0) {
log('had an existing connection to %p', peerId)
if (existingConnection != null) {
log('had an existing non-transient connection to %p', peerId)

return existingConnections[0]
return existingConnection
}
}

Expand Down
36 changes: 36 additions & 0 deletions packages/libp2p/test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,40 @@ describe('Connection Manager', () => {
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.true()
})

it('should allow dialing peers when an existing transient connection exists', async () => {
connectionManager = new DefaultConnectionManager({
peerId: await createEd25519PeerId(),
peerStore: stubInterface<PeerStore>(),
transportManager: stubInterface<TransportManager>(),
connectionGater: stubInterface<ConnectionGater>(),
events: new TypedEventEmitter()
}, {
...defaultOptions,
maxIncomingPendingConnections: 1
})
await connectionManager.start()

const targetPeer = await createEd25519PeerId()
const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`)

const existingConnection = stubInterface<Connection>({
transient: true
})
const newConnection = stubInterface<Connection>()

sinon.stub(connectionManager.dialQueue, 'dial')
.withArgs(addr)
.resolves(newConnection)

// we have an existing transient connection
const map = connectionManager.getConnectionsMap()
map.set(targetPeer, [
existingConnection
])

const conn = await connectionManager.openConnection(addr)

expect(conn).to.equal(newConnection)
})
})