Skip to content

Commit 1f5a061

Browse files
maschadachingbrain
authored andcommitted
refactor!: remove per-peer parallel dialling (#2090)
Since we are now dialling one peer at a time as a means of smarter dialling, we no longer need the option to control the concurrency of parallel dials across a single peers' multiaddrs. Closes #2090
1 parent d8f5bc2 commit 1f5a061

File tree

7 files changed

+51
-223
lines changed

7 files changed

+51
-223
lines changed

doc/migrations/v0.46-v1.0.0.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<!--Specify versions for migration below-->
2-
# Migrating to [email protected].0 <!-- omit in toc -->
2+
# Migrating to [email protected] <!-- omit in toc -->
33

44
A migration guide for refactoring your application code from libp2p `v0.46` to `v1.0.0`.
55

@@ -16,6 +16,7 @@ A migration guide for refactoring your application code from libp2p `v0.46` to `
1616
- [Plaintext](#plaintext)
1717
- [Pnet](#pnet)
1818
- [Metrics](#metrics)
19+
- [Connection Manager](#connection-manager)
1920

2021
## AutoNAT
2122

@@ -311,3 +312,11 @@ The following metrics were renamed:
311312

312313
`libp2p_dialler_pending_dials` => `libp2p_dial_queue_pending_dials`
313314
`libp2p_dialler_in_progress_dials` => `libp2p_dial_queue_in_progress_dials`
315+
316+
## Connection Manager
317+
318+
The observed behavior of dialing peers has been that given a list of supported addresses, if any one routable address would succeed then all would succeed and that if any routable address would fail then all would fail.
319+
320+
Consequently the previous dial behaviour of dialing all available addresses (up to a concurrency limit) and cancelling any in-flight dials when the first succeeds was a very inefficient use of resources.
321+
322+
Since `[email protected]` we have only dialed one address at a time for each peer by setting the default value of the `ConnectionManager`'s `maxParallelDialsPerPeer` option to `1`. As of `[email protected]` this option has been removed.

packages/libp2p/src/connection-manager/constants.defaults.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 30e3
1313
*/
1414
export const MAX_PEER_ADDRS_TO_DIAL = 25
1515

16-
/**
17-
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDialsPerPeer
18-
*/
19-
export const MAX_PARALLEL_DIALS_PER_PEER = 1
20-
2116
/**
2217
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval
2318
*/

packages/libp2p/src/connection-manager/dial-queue.ts

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import { codes } from '../errors.js'
1212
import { getPeerAddress } from '../get-peer.js'
1313
import {
1414
DIAL_TIMEOUT,
15-
MAX_PARALLEL_DIALS_PER_PEER,
1615
MAX_PARALLEL_DIALS,
1716
MAX_PEER_ADDRS_TO_DIAL,
1817
LAST_DIAL_FAILURE_KEY
@@ -44,7 +43,6 @@ interface DialerInit {
4443
addressSorter?: AddressSorter
4544
maxParallelDials?: number
4645
maxPeerAddrsToDial?: number
47-
maxParallelDialsPerPeer?: number
4846
dialTimeout?: number
4947
resolvers?: Record<string, Resolver>
5048
connections?: PeerMap<Connection[]>
@@ -54,7 +52,6 @@ const defaultOptions = {
5452
addressSorter: defaultAddressSort,
5553
maxParallelDials: MAX_PARALLEL_DIALS,
5654
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
57-
maxParallelDialsPerPeer: MAX_PARALLEL_DIALS_PER_PEER,
5855
dialTimeout: DIAL_TIMEOUT,
5956
resolvers: {
6057
dnsaddr: dnsaddrResolver
@@ -79,7 +76,6 @@ export class DialQueue {
7976
private readonly transportManager: TransportManager
8077
private readonly addressSorter: AddressSorter
8178
private readonly maxPeerAddrsToDial: number
82-
private readonly maxParallelDialsPerPeer: number
8379
private readonly dialTimeout: number
8480
private readonly inProgressDialCount?: Metric
8581
private readonly pendingDialCount?: Metric
@@ -90,7 +86,6 @@ export class DialQueue {
9086
constructor (components: DialQueueComponents, init: DialerInit = {}) {
9187
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
9288
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
93-
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
9489
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
9590
this.connections = init.connections ?? new PeerMap()
9691
this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue')
@@ -463,18 +458,10 @@ export class DialQueue {
463458
const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController())
464459

465460
try {
466-
let success = false
467-
468-
// internal peer dial queue to ensure we only dial the configured number of addresses
469-
// per peer at the same time to prevent one peer with a lot of addresses swamping
470-
// the dial queue
471-
const peerDialQueue = new PQueue({
472-
concurrency: this.maxParallelDialsPerPeer
473-
})
461+
// internal peer dial queue - only one dial per peer at a time
462+
const peerDialQueue = new PQueue({ concurrency: 1 })
474463
peerDialQueue.on('error', (err) => {
475-
if (!success) {
476-
this.log.error('error dialling [%s] %o', pendingDial.multiaddrs, err)
477-
}
464+
this.log.error('error dialing [%s] %o', pendingDial.multiaddrs, err)
478465
})
479466

480467
const conn = await Promise.any(pendingDial.multiaddrs.map(async (addr, i) => {

packages/libp2p/src/connection-manager/index.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { codes } from '../errors.js'
99
import { getPeerAddress } from '../get-peer.js'
1010
import { AutoDial } from './auto-dial.js'
1111
import { ConnectionPruner } from './connection-pruner.js'
12-
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
12+
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
1313
import { DialQueue } from './dial-queue.js'
1414
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger } from '@libp2p/interface'
1515
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
@@ -89,14 +89,6 @@ export interface ConnectionManagerInit {
8989
*/
9090
maxParallelDials?: number
9191

92-
/**
93-
* To prevent individual peers with large amounts of multiaddrs swamping the
94-
* dial queue, this value controls how many addresses to dial in parallel per
95-
* peer. So for example if two peers have 10 addresses and this value is set
96-
* at 5, we will dial 5 addresses from each at a time. (default: 1)
97-
*/
98-
maxParallelDialsPerPeer?: number
99-
10092
/**
10193
* Maximum number of addresses allowed for a given peer - if a peer has more
10294
* addresses than this then the dial will fail. (default: 25)
@@ -260,7 +252,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
260252
addressSorter: init.addressSorter ?? defaultAddressSort,
261253
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
262254
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
263-
maxParallelDialsPerPeer: init.maxParallelDialsPerPeer ?? MAX_PARALLEL_DIALS_PER_PEER,
264255
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
265256
resolvers: init.resolvers ?? {
266257
dnsaddr: dnsaddrResolver

packages/libp2p/test/connection-manager/dial-queue.spec.ts

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -193,103 +193,6 @@ describe('dial queue', () => {
193193
expect(reject).to.have.property('callCount', addrs.length)
194194
})
195195

196-
it('should abort all dials when its signal is aborted', async () => {
197-
const signals: Record<string, AbortSignal | undefined> = {}
198-
const slowDial = async (): Promise<void> => {
199-
await delay(1000)
200-
}
201-
const actions: Record<string, (...args: any[]) => Promise<any>> = {
202-
'/ip4/127.0.0.1/tcp/1231': slowDial,
203-
'/ip4/127.0.0.1/tcp/1232': slowDial,
204-
'/ip4/127.0.0.1/tcp/1233': slowDial
205-
}
206-
const controller = new AbortController()
207-
208-
dialer = new DialQueue(components, {
209-
maxParallelDials: 2,
210-
maxParallelDialsPerPeer: 10
211-
})
212-
213-
components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
214-
components.transportManager.dial.callsFake(async (ma, options = {}) => {
215-
const maStr = ma.toString()
216-
const action = actions[maStr]
217-
218-
if (action != null) {
219-
signals[maStr] = options.signal
220-
return action()
221-
}
222-
223-
throw new Error(`No action found for multiaddr ${maStr}`)
224-
})
225-
226-
setTimeout(() => { controller.abort() }, 100)
227-
228-
await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)), {
229-
signal: controller.signal
230-
})).to.eventually.be.rejected
231-
.with.property('name', 'CodeError')
232-
233-
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
234-
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', true)
235-
expect(signals).to.not.have.property('/ip4/127.0.0.1/tcp/1233') // never dialled as above the maxParallelDials limit
236-
})
237-
238-
it('should abort other dials when one succeeds', async () => {
239-
const remotePeer = await createEd25519PeerId()
240-
const connection1 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
241-
const connection2 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
242-
const connection3 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
243-
const actions: Record<string, () => Promise<Connection>> = {
244-
'/ip4/127.0.0.1/tcp/1231': async () => {
245-
// Slow dial
246-
await delay(1000)
247-
248-
return connection1
249-
},
250-
'/ip4/127.0.0.1/tcp/1232': async () => {
251-
// Fast dial
252-
await delay(10)
253-
254-
return connection2
255-
},
256-
'/ip4/127.0.0.1/tcp/1233': async () => {
257-
// Slow dial
258-
await delay(1000)
259-
260-
return connection3
261-
}
262-
}
263-
const signals: Record<string, AbortSignal> = {}
264-
265-
components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
266-
components.transportManager.dial.callsFake(async (ma, opts = {}) => {
267-
const maStr = ma.toString()
268-
const action = actions[maStr]
269-
270-
if (action != null) {
271-
signals[maStr] = opts.signal
272-
return action()
273-
}
274-
275-
throw new Error(`No action found for multiaddr ${maStr}`)
276-
})
277-
278-
dialer = new DialQueue(components, {
279-
maxParallelDials: 50,
280-
maxParallelDialsPerPeer: 10
281-
})
282-
283-
await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)))).to.eventually.equal(connection2)
284-
285-
// Dial attempt finished without connection
286-
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
287-
// Dial attempt led to connection
288-
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
289-
// Dial attempt finished without connection
290-
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', true)
291-
})
292-
293196
it('should ignore DNS addresses for other peers', async () => {
294197
const remotePeer = await createEd25519PeerId()
295198
const otherRemotePeer = await createEd25519PeerId()

packages/libp2p/test/connection-manager/direct.node.ts

Lines changed: 36 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { MemoryDatastore } from 'datastore-core/memory'
2222
import delay from 'delay'
2323
import { pipe } from 'it-pipe'
2424
import { pushable } from 'it-pushable'
25-
import pDefer from 'p-defer'
2625
import pWaitFor from 'p-wait-for'
2726
import Sinon from 'sinon'
2827
import { stubInterface } from 'sinon-ts'
@@ -223,84 +222,57 @@ describe('dialing (direct, TCP)', () => {
223222
.and.to.have.property('code', ERR_TIMEOUT)
224223
})
225224

226-
it('should dial to the max concurrency', async () => {
227-
const peerId = await createEd25519PeerId()
228-
const addrs = [
229-
multiaddr('/ip4/0.0.0.0/tcp/8000'),
230-
multiaddr('/ip4/0.0.0.0/tcp/8001'),
231-
multiaddr('/ip4/0.0.0.0/tcp/8002')
232-
]
225+
it('should only dial to the max concurrency', async () => {
226+
const peerId1 = await createEd25519PeerId()
227+
const peerId2 = await createEd25519PeerId()
228+
const peerId3 = await createEd25519PeerId()
229+
230+
const addr1 = multiaddr(`/ip4/127.0.0.1/tcp/1234/p2p/${peerId1}`)
231+
const addr2 = multiaddr(`/ip4/127.0.12.4/tcp/3210/p2p/${peerId2}`)
232+
const addr3 = multiaddr(`/ip4/123.3.11.1/tcp/2010/p2p/${peerId3}`)
233+
234+
const slowDial = async (): Promise<Connection> => {
235+
await delay(100)
236+
return mockConnection(mockMultiaddrConnection(mockDuplex(), peerId1))
237+
}
238+
239+
const actions: Record<string, (...args: any[]) => Promise<any>> = {
240+
[addr1.toString()]: slowDial,
241+
[addr2.toString()]: slowDial,
242+
[addr3.toString()]: async () => mockConnection(mockMultiaddrConnection(mockDuplex(), peerId3))
243+
}
233244

234245
const dialer = new DialQueue(localComponents, {
235-
maxParallelDials: 2,
236-
maxParallelDialsPerPeer: 10
246+
maxParallelDials: 2
237247
})
238248

239-
const deferredDial = pDefer<Connection>()
240249
const transportManagerDialStub = Sinon.stub(localTM, 'dial')
241-
transportManagerDialStub.callsFake(async () => deferredDial.promise)
250+
transportManagerDialStub.callsFake(async ma => {
251+
const maStr = ma.toString()
252+
const action = actions[maStr]
242253

243-
// Perform 3 multiaddr dials
244-
void dialer.dial(addrs)
245-
246-
// Let the call stack run
247-
await delay(0)
254+
if (action != null) {
255+
return action()
256+
}
248257

249-
// We should have 2 in progress, and 1 waiting
250-
expect(transportManagerDialStub).to.have.property('callCount', 2)
258+
throw new Error(`No action found for multiaddr ${maStr}`)
259+
})
251260

252-
deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId)))
261+
// dial 3 different peers
262+
void Promise.all([
263+
dialer.dial(addr1),
264+
dialer.dial(addr2),
265+
dialer.dial(addr3)
266+
])
253267

254268
// Let the call stack run
255269
await delay(0)
256270

257-
// Only two dials should be executed, as the first dial will succeed
271+
// We should have 2 in progress, and 1 waiting
258272
expect(transportManagerDialStub).to.have.property('callCount', 2)
259-
})
260273

261-
it('should append the remote peerId to multiaddrs', async () => {
262-
const addrs = [
263-
multiaddr('/ip4/0.0.0.0/tcp/8000'),
264-
multiaddr('/ip4/0.0.0.0/tcp/8001'),
265-
multiaddr('/ip4/0.0.0.0/tcp/8002'),
266-
multiaddr('/unix/tmp/some/path.sock')
267-
]
268-
269-
// Inject data into the AddressBook
270-
await localComponents.peerStore.merge(remoteComponents.peerId, {
271-
multiaddrs: addrs
272-
})
273-
274-
const dialer = new DialQueue(localComponents, {
275-
maxParallelDialsPerPeer: 10
276-
})
277-
278-
const transportManagerDialStub = Sinon.stub(localTM, 'dial')
279-
transportManagerDialStub.callsFake(async (ma) => {
280-
await delay(10)
281-
return mockConnection(mockMultiaddrConnection(mockDuplex(), remoteComponents.peerId))
282-
})
283-
284-
// Perform dial
285-
await dialer.dial(remoteComponents.peerId)
274+
// stop dials
286275
dialer.stop()
287-
288-
// Dialled each address
289-
expect(transportManagerDialStub).to.have.property('callCount', 4)
290-
291-
for (let i = 0; i < addrs.length; i++) {
292-
const call = transportManagerDialStub.getCall(i)
293-
const ma = call.args[0]
294-
295-
// should not append peerId to path multiaddrs
296-
if (ma.toString().startsWith('/unix')) {
297-
expect(ma.toString()).to.not.endWith(`/p2p/${remoteComponents.peerId.toString()}`)
298-
299-
continue
300-
}
301-
302-
expect(ma.toString()).to.endWith(`/p2p/${remoteComponents.peerId.toString()}`)
303-
}
304276
})
305277
})
306278

0 commit comments

Comments
 (0)