Skip to content

Commit 66fbdb9

Browse files
maschadachingbrain
andauthored
refactor!: remove per-peer parallel dialling (#2163)
Since we are now dialling one peer at a time as a means of smarter dialling, we no longer need the option to control the concurrency of parallel dials across the multiaddrs of a single peer. Closes #2090 --------- Co-authored-by: Alex Potsides <[email protected]>
1 parent 5178617 commit 66fbdb9

File tree

7 files changed

+50
-218
lines changed

7 files changed

+50
-218
lines changed

doc/migrations/v0.46-v1.0.0.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<!--Specify versions for migration below-->
2-
# Migrating to [email protected].0 <!-- omit in toc -->
2+
# Migrating to [email protected] <!-- omit in toc -->
33

44
A migration guide for refactoring your application code from libp2p `v0.46` to `v1.0.0`.
55

@@ -9,6 +9,7 @@ A migration guide for refactoring your application code from libp2p `v0.46` to `
99
- [KeyChain](#keychain)
1010
- [Pnet](#pnet)
1111
- [Metrics](#metrics)
12+
- [Connection Manager](#connection-manager)
1213

1314
## AutoNAT
1415

@@ -77,3 +78,11 @@ The following metrics were renamed:
7778

7879
`libp2p_dialler_pending_dials` => `libp2p_dial_queue_pending_dials`
7980
`libp2p_dialler_in_progress_dials` => `libp2p_dial_queue_in_progress_dials`
81+
82+
## Connection Manager
83+
84+
The observed behavior of dialing peers has been that given a list of supported addresses, if any one routable address would succeed then all would succeed and that if any routable address would fail then all would fail.
85+
86+
Consequently the previous dial behaviour of dialing all available addresses (up to a concurrency limit) and cancelling any in-flight dials when the first succeeds was a very inefficient use of resources.
87+
88+
Since `[email protected]` we have only dialed one address at a time for each peer by setting the default value of the `ConnectionManager`'s `maxParallelDialsPerPeer` option to `1`. As of `[email protected]` this option has been removed.

packages/libp2p/src/connection-manager/constants.defaults.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 30e3
1313
*/
1414
export const MAX_PEER_ADDRS_TO_DIAL = 25
1515

16-
/**
17-
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDialsPerPeer
18-
*/
19-
export const MAX_PARALLEL_DIALS_PER_PEER = 1
20-
2116
/**
2217
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval
2318
*/

packages/libp2p/src/connection-manager/dial-queue.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import { codes } from '../errors.js'
1212
import { getPeerAddress } from '../get-peer.js'
1313
import {
1414
DIAL_TIMEOUT,
15-
MAX_PARALLEL_DIALS_PER_PEER,
1615
MAX_PARALLEL_DIALS,
1716
MAX_PEER_ADDRS_TO_DIAL,
1817
LAST_DIAL_FAILURE_KEY
@@ -45,7 +44,6 @@ interface DialerInit {
4544
addressSorter?: AddressSorter
4645
maxParallelDials?: number
4746
maxPeerAddrsToDial?: number
48-
maxParallelDialsPerPeer?: number
4947
dialTimeout?: number
5048
resolvers?: Record<string, Resolver>
5149
}
@@ -54,7 +52,6 @@ const defaultOptions = {
5452
addressSorter: defaultAddressSort,
5553
maxParallelDials: MAX_PARALLEL_DIALS,
5654
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
57-
maxParallelDialsPerPeer: MAX_PARALLEL_DIALS_PER_PEER,
5855
dialTimeout: DIAL_TIMEOUT,
5956
resolvers: {
6057
dnsaddr: dnsaddrResolver
@@ -78,7 +75,6 @@ export class DialQueue {
7875
private readonly transportManager: TransportManager
7976
private readonly addressSorter: AddressSorter
8077
private readonly maxPeerAddrsToDial: number
81-
private readonly maxParallelDialsPerPeer: number
8278
private readonly dialTimeout: number
8379
private readonly inProgressDialCount?: Metric
8480
private readonly pendingDialCount?: Metric
@@ -87,7 +83,6 @@ export class DialQueue {
8783
constructor (components: DialQueueComponents, init: DialerInit = {}) {
8884
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
8985
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
90-
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
9186
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
9287

9388
this.peerId = components.peerId
@@ -421,12 +416,8 @@ export class DialQueue {
421416
const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController())
422417

423418
try {
424-
// internal peer dial queue to ensure we only dial the configured number of addresses
425-
// per peer at the same time to prevent one peer with a lot of addresses swamping
426-
// the dial queue
427-
const peerDialQueue = new PQueue({
428-
concurrency: this.maxParallelDialsPerPeer
429-
})
419+
// internal peer dial queue - only one dial per peer at a time
420+
const peerDialQueue = new PQueue({ concurrency: 1 })
430421
peerDialQueue.on('error', (err) => {
431422
log.error('error dialling', err)
432423
})

packages/libp2p/src/connection-manager/index.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { codes } from '../errors.js'
1010
import { getPeerAddress } from '../get-peer.js'
1111
import { AutoDial } from './auto-dial.js'
1212
import { ConnectionPruner } from './connection-pruner.js'
13-
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
13+
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
1414
import { DialQueue } from './dial-queue.js'
1515
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions } from '@libp2p/interface'
1616
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
@@ -92,14 +92,6 @@ export interface ConnectionManagerInit {
9292
*/
9393
maxParallelDials?: number
9494

95-
/**
96-
* To prevent individual peers with large amounts of multiaddrs swamping the
97-
* dial queue, this value controls how many addresses to dial in parallel per
98-
* peer. So for example if two peers have 10 addresses and this value is set
99-
* at 5, we will dial 5 addresses from each at a time. (default: 1)
100-
*/
101-
maxParallelDialsPerPeer?: number
102-
10395
/**
10496
* Maximum number of addresses allowed for a given peer - if a peer has more
10597
* addresses than this then the dial will fail. (default: 25)
@@ -257,7 +249,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
257249
addressSorter: init.addressSorter ?? defaultAddressSort,
258250
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
259251
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
260-
maxParallelDialsPerPeer: init.maxParallelDialsPerPeer ?? MAX_PARALLEL_DIALS_PER_PEER,
261252
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
262253
resolvers: init.resolvers ?? {
263254
dnsaddr: dnsaddrResolver

packages/libp2p/test/connection-manager/dial-queue.spec.ts

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -186,103 +186,6 @@ describe('dial queue', () => {
186186
expect(reject).to.have.property('callCount', addrs.length)
187187
})
188188

189-
it('should abort all dials when its signal is aborted', async () => {
190-
const signals: Record<string, AbortSignal | undefined> = {}
191-
const slowDial = async (): Promise<void> => {
192-
await delay(1000)
193-
}
194-
const actions: Record<string, (...args: any[]) => Promise<any>> = {
195-
'/ip4/127.0.0.1/tcp/1231': slowDial,
196-
'/ip4/127.0.0.1/tcp/1232': slowDial,
197-
'/ip4/127.0.0.1/tcp/1233': slowDial
198-
}
199-
const controller = new AbortController()
200-
201-
dialer = new DialQueue(components, {
202-
maxParallelDials: 2,
203-
maxParallelDialsPerPeer: 10
204-
})
205-
206-
components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
207-
components.transportManager.dial.callsFake(async (ma, options = {}) => {
208-
const maStr = ma.toString()
209-
const action = actions[maStr]
210-
211-
if (action != null) {
212-
signals[maStr] = options.signal
213-
return action()
214-
}
215-
216-
throw new Error(`No action found for multiaddr ${maStr}`)
217-
})
218-
219-
setTimeout(() => { controller.abort() }, 100)
220-
221-
await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)), {
222-
signal: controller.signal
223-
})).to.eventually.be.rejected
224-
.with.property('name', 'CodeError')
225-
226-
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
227-
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', true)
228-
expect(signals).to.not.have.property('/ip4/127.0.0.1/tcp/1233') // never dialled as above the maxParallelDials limit
229-
})
230-
231-
it('should abort other dials when one succeeds', async () => {
232-
const remotePeer = await createEd25519PeerId()
233-
const connection1 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
234-
const connection2 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
235-
const connection3 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
236-
const actions: Record<string, () => Promise<Connection>> = {
237-
'/ip4/127.0.0.1/tcp/1231': async () => {
238-
// Slow dial
239-
await delay(1000)
240-
241-
return connection1
242-
},
243-
'/ip4/127.0.0.1/tcp/1232': async () => {
244-
// Fast dial
245-
await delay(10)
246-
247-
return connection2
248-
},
249-
'/ip4/127.0.0.1/tcp/1233': async () => {
250-
// Slow dial
251-
await delay(1000)
252-
253-
return connection3
254-
}
255-
}
256-
const signals: Record<string, AbortSignal> = {}
257-
258-
components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
259-
components.transportManager.dial.callsFake(async (ma, opts = {}) => {
260-
const maStr = ma.toString()
261-
const action = actions[maStr]
262-
263-
if (action != null) {
264-
signals[maStr] = opts.signal
265-
return action()
266-
}
267-
268-
throw new Error(`No action found for multiaddr ${maStr}`)
269-
})
270-
271-
dialer = new DialQueue(components, {
272-
maxParallelDials: 50,
273-
maxParallelDialsPerPeer: 10
274-
})
275-
276-
await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)))).to.eventually.equal(connection2)
277-
278-
// Dial attempt finished without connection
279-
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
280-
// Dial attempt led to connection
281-
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
282-
// Dial attempt finished without connection
283-
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', true)
284-
})
285-
286189
it('should ignore DNS addresses for other peers', async () => {
287190
const remotePeer = await createEd25519PeerId()
288191
const otherRemotePeer = await createEd25519PeerId()

packages/libp2p/test/connection-manager/direct.node.ts

Lines changed: 36 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import { MemoryDatastore } from 'datastore-core/memory'
2020
import delay from 'delay'
2121
import { pipe } from 'it-pipe'
2222
import { pushable } from 'it-pushable'
23-
import pDefer from 'p-defer'
2423
import pWaitFor from 'p-wait-for'
2524
import sinon from 'sinon'
2625
import { stubInterface } from 'sinon-ts'
@@ -218,84 +217,57 @@ describe('dialing (direct, TCP)', () => {
218217
.and.to.have.property('code', ERR_TIMEOUT)
219218
})
220219

221-
it('should dial to the max concurrency', async () => {
222-
const peerId = await createEd25519PeerId()
223-
const addrs = [
224-
multiaddr('/ip4/0.0.0.0/tcp/8000'),
225-
multiaddr('/ip4/0.0.0.0/tcp/8001'),
226-
multiaddr('/ip4/0.0.0.0/tcp/8002')
227-
]
220+
it('should only dial to the max concurrency', async () => {
221+
const peerId1 = await createEd25519PeerId()
222+
const peerId2 = await createEd25519PeerId()
223+
const peerId3 = await createEd25519PeerId()
224+
225+
const addr1 = multiaddr(`/ip4/127.0.0.1/tcp/1234/p2p/${peerId1}`)
226+
const addr2 = multiaddr(`/ip4/127.0.12.4/tcp/3210/p2p/${peerId2}`)
227+
const addr3 = multiaddr(`/ip4/123.3.11.1/tcp/2010/p2p/${peerId3}`)
228+
229+
const slowDial = async (): Promise<Connection> => {
230+
await delay(100)
231+
return mockConnection(mockMultiaddrConnection(mockDuplex(), peerId1))
232+
}
233+
234+
const actions: Record<string, (...args: any[]) => Promise<any>> = {
235+
[addr1.toString()]: slowDial,
236+
[addr2.toString()]: slowDial,
237+
[addr3.toString()]: async () => mockConnection(mockMultiaddrConnection(mockDuplex(), peerId3))
238+
}
228239

229240
const dialer = new DialQueue(localComponents, {
230-
maxParallelDials: 2,
231-
maxParallelDialsPerPeer: 10
241+
maxParallelDials: 2
232242
})
233243

234-
const deferredDial = pDefer<Connection>()
235244
const transportManagerDialStub = sinon.stub(localTM, 'dial')
236-
transportManagerDialStub.callsFake(async () => deferredDial.promise)
245+
transportManagerDialStub.callsFake(async ma => {
246+
const maStr = ma.toString()
247+
const action = actions[maStr]
237248

238-
// Perform 3 multiaddr dials
239-
void dialer.dial(addrs)
240-
241-
// Let the call stack run
242-
await delay(0)
249+
if (action != null) {
250+
return action()
251+
}
243252

244-
// We should have 2 in progress, and 1 waiting
245-
expect(transportManagerDialStub).to.have.property('callCount', 2)
253+
throw new Error(`No action found for multiaddr ${maStr}`)
254+
})
246255

247-
deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId)))
256+
// dial 3 different peers
257+
void Promise.all([
258+
dialer.dial(addr1),
259+
dialer.dial(addr2),
260+
dialer.dial(addr3)
261+
])
248262

249263
// Let the call stack run
250264
await delay(0)
251265

252-
// Only two dials should be executed, as the first dial will succeed
266+
// We should have 2 in progress, and 1 waiting
253267
expect(transportManagerDialStub).to.have.property('callCount', 2)
254-
})
255268

256-
it('should append the remote peerId to multiaddrs', async () => {
257-
const addrs = [
258-
multiaddr('/ip4/0.0.0.0/tcp/8000'),
259-
multiaddr('/ip4/0.0.0.0/tcp/8001'),
260-
multiaddr('/ip4/0.0.0.0/tcp/8002'),
261-
multiaddr('/unix/tmp/some/path.sock')
262-
]
263-
264-
// Inject data into the AddressBook
265-
await localComponents.peerStore.merge(remoteComponents.peerId, {
266-
multiaddrs: addrs
267-
})
268-
269-
const dialer = new DialQueue(localComponents, {
270-
maxParallelDialsPerPeer: 10
271-
})
272-
273-
const transportManagerDialStub = sinon.stub(localTM, 'dial')
274-
transportManagerDialStub.callsFake(async (ma) => {
275-
await delay(10)
276-
return mockConnection(mockMultiaddrConnection(mockDuplex(), remoteComponents.peerId))
277-
})
278-
279-
// Perform dial
280-
await dialer.dial(remoteComponents.peerId)
269+
// stop dials
281270
dialer.stop()
282-
283-
// Dialled each address
284-
expect(transportManagerDialStub).to.have.property('callCount', 4)
285-
286-
for (let i = 0; i < addrs.length; i++) {
287-
const call = transportManagerDialStub.getCall(i)
288-
const ma = call.args[0]
289-
290-
// should not append peerId to path multiaddrs
291-
if (ma.toString().startsWith('/unix')) {
292-
expect(ma.toString()).to.not.endWith(`/p2p/${remoteComponents.peerId.toString()}`)
293-
294-
continue
295-
}
296-
297-
expect(ma.toString()).to.endWith(`/p2p/${remoteComponents.peerId.toString()}`)
298-
}
299271
})
300272
})
301273

0 commit comments

Comments
 (0)