11import { CodeError } from '@libp2p/interface'
2+ import { PeerSet } from '@libp2p/peer-collections'
23import merge from 'it-merge'
3- import { pipe } from 'it-pipe'
4- import { codes , messages } from '../errors.js'
5- import {
6- storeAddresses ,
7- uniquePeers ,
8- requirePeers
9- } from './utils.js'
10- import type { AbortOptions , ContentRouting , PeerInfo , PeerStore , Startable } from '@libp2p/interface'
4+ import parallel from 'it-parallel'
5+ import { codes , messages } from './errors.js'
6+ import type { AbortOptions , ComponentLogger , ContentRouting , Logger , PeerInfo , PeerRouting , PeerStore , RoutingOptions , Startable } from '@libp2p/interface'
117import type { CID } from 'multiformats/cid'
128
139export interface CompoundContentRoutingInit {
@@ -16,14 +12,18 @@ export interface CompoundContentRoutingInit {
1612
1713export interface CompoundContentRoutingComponents {
1814 peerStore : PeerStore
15+ peerRouting : PeerRouting
16+ logger : ComponentLogger
1917}
2018
2119export class CompoundContentRouting implements ContentRouting , Startable {
20+ private readonly log : Logger
2221 private readonly routers : ContentRouting [ ]
2322 private started : boolean
2423 private readonly components : CompoundContentRoutingComponents
2524
2625 constructor ( components : CompoundContentRoutingComponents , init : CompoundContentRoutingInit ) {
26+ this . log = components . logger . forComponent ( 'libp2p:content-routing' )
2727 this . routers = init . routers ?? [ ]
2828 this . started = false
2929 this . components = components
@@ -44,19 +44,65 @@ export class CompoundContentRouting implements ContentRouting, Startable {
4444 /**
4545 * Iterates over all content routers in parallel to find providers of the given key
4646 */
47- async * findProviders ( key : CID , options : AbortOptions = { } ) : AsyncIterable < PeerInfo > {
47+ async * findProviders ( key : CID , options : RoutingOptions = { } ) : AsyncIterable < PeerInfo > {
4848 if ( this . routers . length === 0 ) {
4949 throw new CodeError ( 'No content routers available' , codes . ERR_NO_ROUTERS_AVAILABLE )
5050 }
5151
52- yield * pipe (
53- merge (
54- ...this . routers . map ( router => router . findProviders ( key , options ) )
55- ) ,
56- ( source ) => storeAddresses ( source , this . components . peerStore ) ,
57- ( source ) => uniquePeers ( source ) ,
58- ( source ) => requirePeers ( source )
59- )
52+ const self = this
53+ const seen = new PeerSet ( )
54+
55+ for await ( const peer of parallel (
56+ async function * ( ) {
57+ const source = merge (
58+ ...self . routers . map ( router => router . findProviders ( key , options ) )
59+ )
60+
61+ for await ( let peer of source ) {
62+ yield async ( ) => {
63+ // find multiaddrs if they are missing
64+ if ( peer . multiaddrs . length === 0 ) {
65+ try {
66+ peer = await self . components . peerRouting . findPeer ( peer . id , {
67+ ...options ,
68+ useCache : false
69+ } )
70+ } catch ( err ) {
71+ self . log . error ( 'could not find peer multiaddrs' , err )
72+ return
73+ }
74+ }
75+
76+ return peer
77+ }
78+ }
79+ } ( )
80+ ) ) {
81+ // the peer was yielded by a content router without multiaddrs and we
82+ // failed to load them
83+ if ( peer == null ) {
84+ continue
85+ }
86+
87+ // skip peers without addresses
88+ if ( peer . multiaddrs . length === 0 ) {
89+ continue
90+ }
91+
92+ // ensure we have the addresses for a given peer
93+ await this . components . peerStore . merge ( peer . id , {
94+ multiaddrs : peer . multiaddrs
95+ } )
96+
97+ // deduplicate peers
98+ if ( seen . has ( peer . id ) ) {
99+ continue
100+ }
101+
102+ seen . add ( peer . id )
103+
104+ yield peer
105+ }
60106 }
61107
62108 /**
@@ -68,7 +114,9 @@ export class CompoundContentRouting implements ContentRouting, Startable {
68114 throw new CodeError ( 'No content routers available' , codes . ERR_NO_ROUTERS_AVAILABLE )
69115 }
70116
71- await Promise . all ( this . routers . map ( async ( router ) => { await router . provide ( key , options ) } ) )
117+ await Promise . all ( this . routers . map ( async ( router ) => {
118+ await router . provide ( key , options )
119+ } ) )
72120 }
73121
74122 /**
0 commit comments