Skip to content

Commit 361cc57

Browse files
authored
fix!: allow opening remote streams (#126)
Updates client/server code in line with go implementation to open and read remote streams. Also updates all deps. BREAKING CHANGE: the stream type returned by `client.openStream` has changed
1 parent 8380f68 commit 361cc57

File tree

5 files changed

+101
-43
lines changed

5 files changed

+101
-43
lines changed

packages/libp2p-daemon-client/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@
143143
"@multiformats/multiaddr": "^10.1.8",
144144
"err-code": "^3.0.1",
145145
"it-stream-types": "^1.0.4",
146-
"multiformats": "^9.6.4"
146+
"multiformats": "^9.6.4",
147+
"uint8arraylist": "^2.3.2"
147148
},
148149
"devDependencies": {
149150
"@libp2p/components": "^2.0.0",

packages/libp2p-daemon-client/src/index.ts

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import errcode from 'err-code'
22
import { TCP } from '@libp2p/tcp'
3-
import { PSMessage, Request, Response } from '@libp2p/daemon-protocol'
3+
import { PSMessage, Request, Response, StreamInfo } from '@libp2p/daemon-protocol'
44
import { StreamHandler } from '@libp2p/daemon-protocol/stream-handler'
55
import { Multiaddr } from '@multiformats/multiaddr'
66
import { DHT } from './dht.js'
@@ -12,6 +12,10 @@ import type { Duplex } from 'it-stream-types'
1212
import type { CID } from 'multiformats/cid'
1313
import type { PeerInfo } from '@libp2p/interface-peer-info'
1414
import type { MultiaddrConnection } from '@libp2p/interface-connection'
15+
import type { Uint8ArrayList } from 'uint8arraylist'
16+
import { logger } from '@libp2p/logger'
17+
18+
const log = logger('libp2p:daemon-client')
1519

1620
class Client implements DaemonClient {
1721
private readonly multiaddr: Multiaddr
@@ -22,7 +26,6 @@ class Client implements DaemonClient {
2226
constructor (addr: Multiaddr) {
2327
this.multiaddr = addr
2428
this.tcp = new TCP()
25-
2629
this.dht = new DHT(this)
2730
this.pubsub = new Pubsub(this)
2831
}
@@ -150,7 +153,7 @@ class Client implements DaemonClient {
150153
/**
151154
* Initiate an outbound stream to a peer on one of a set of protocols.
152155
*/
153-
async openStream (peerId: PeerId, protocol: string): Promise<Duplex<Uint8Array>> {
156+
async openStream (peerId: PeerId, protocol: string): Promise<Duplex<Uint8ArrayList, Uint8Array>> {
154157
if (!isPeerId(peerId)) {
155158
throw errcode(new Error('invalid peer id received'), 'ERR_INVALID_PEER_ID')
156159
}
@@ -181,20 +184,58 @@ class Client implements DaemonClient {
181184
/**
182185
* Register a handler for inbound streams on a given protocol
183186
*/
184-
async registerStreamHandler (addr: Multiaddr, protocol: string) {
185-
if (!Multiaddr.isMultiaddr(addr)) {
186-
throw errcode(new Error('invalid multiaddr received'), 'ERR_INVALID_MULTIADDR')
187-
}
188-
187+
async registerStreamHandler (protocol: string, handler: StreamHandlerFunction): Promise<void> {
189188
if (typeof protocol !== 'string') {
190189
throw errcode(new Error('invalid protocol received'), 'ERR_INVALID_PROTOCOL')
191190
}
192191

192+
// open a tcp port, pipe any data from it to the handler function
193+
const listener = this.tcp.createListener({
194+
upgrader: passThroughUpgrader,
195+
handler: (connection) => {
196+
Promise.resolve()
197+
.then(async () => {
198+
const sh = new StreamHandler({
199+
// @ts-expect-error because we are using a passthrough upgrader, this is a MultiaddrConnection
200+
stream: connection
201+
})
202+
const message = await sh.read()
203+
204+
if (message == null) {
205+
throw errcode(new Error('Could not read open stream response'), 'ERR_OPEN_STREAM_FAILED')
206+
}
207+
208+
const response = StreamInfo.decode(message)
209+
210+
if (response.proto !== protocol) {
211+
throw errcode(new Error('Incorrect protocol'), 'ERR_OPEN_STREAM_FAILED')
212+
}
213+
214+
await handler(sh.rest())
215+
})
216+
.finally(() => {
217+
connection.close()
218+
.catch(err => {
219+
log.error(err)
220+
})
221+
listener.close()
222+
.catch(err => {
223+
log.error(err)
224+
})
225+
})
226+
}
227+
})
228+
await listener.listen(new Multiaddr('/ip4/127.0.0.1/tcp/0'))
229+
const address = listener.getAddrs()[0]
230+
231+
if (address == null) {
232+
throw errcode(new Error('Could not listen on port'), 'ERR_REGISTER_STREAM_HANDLER_FAILED')
233+
}
234+
193235
const sh = await this.send({
194236
type: Request.Type.STREAM_HANDLER,
195-
streamOpen: undefined,
196237
streamHandler: {
197-
addr: addr.bytes,
238+
addr: address.bytes,
198239
proto: [protocol]
199240
}
200241
})
@@ -215,6 +256,10 @@ export interface IdentifyResult {
215256
addrs: Multiaddr[]
216257
}
217258

259+
export interface StreamHandlerFunction {
260+
(stream: Duplex<Uint8ArrayList, Uint8Array>): Promise<void>
261+
}
262+
218263
export interface DHTClient {
219264
put: (key: Uint8Array, value: Uint8Array) => Promise<void>
220265
get: (key: Uint8Array) => Promise<Uint8Array>
@@ -238,7 +283,8 @@ export interface DaemonClient {
238283
pubsub: PubSubClient
239284

240285
send: (request: Request) => Promise<StreamHandler>
241-
openStream: (peerId: PeerId, protocol: string) => Promise<Duplex<Uint8Array>>
286+
openStream: (peerId: PeerId, protocol: string) => Promise<Duplex<Uint8ArrayList, Uint8Array>>
287+
registerStreamHandler: (protocol: string, handler: StreamHandlerFunction) => Promise<void>
242288
}
243289

244290
export function createClient (multiaddr: Multiaddr): DaemonClient {

packages/libp2p-daemon-client/test/stream.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,6 @@ describe('daemon stream client', function () {
8686
)
8787

8888
expect(data).to.have.lengthOf(1)
89-
expect(uint8ArrayToString(data[0])).to.equal('hello world')
89+
expect(uint8ArrayToString(data[0].subarray())).to.equal('hello world')
9090
})
9191
})

packages/libp2p-daemon-protocol/src/stream-handler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export interface StreamHandlerOptions {
1414

1515
export class StreamHandler {
1616
private readonly stream: Duplex<Uint8Array>
17-
private readonly shake: Handshake
17+
private readonly shake: Handshake<Uint8Array>
1818
public decoder: Source<Uint8ArrayList>
1919
/**
2020
* Create a stream handler for connection
@@ -34,7 +34,7 @@ export class StreamHandler {
3434
// @ts-expect-error decoder is really a generator
3535
const msg = await this.decoder.next()
3636
if (msg.value != null) {
37-
return msg.value.slice()
37+
return msg.value.subarray()
3838
}
3939

4040
log('read received no value, closing stream')

packages/libp2p-daemon-server/src/index.ts

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
StreamInfo
1616
} from '@libp2p/daemon-protocol'
1717
import type { Listener } from '@libp2p/interface-transport'
18-
import type { Connection, Stream } from '@libp2p/interface-connection'
18+
import type { Connection, MultiaddrConnection, Stream } from '@libp2p/interface-connection'
1919
import type { PeerId } from '@libp2p/interface-peer-id'
2020
import type { AbortOptions } from '@libp2p/interfaces'
2121
import type { StreamHandler as StreamCallback } from '@libp2p/interface-registrar'
@@ -67,7 +67,6 @@ export class Server implements Libp2pServer {
6767
private readonly libp2p: Libp2p
6868
private readonly tcp: TCP
6969
private readonly listener: Listener
70-
private readonly streamHandlers: Record<string, StreamHandler>
7170
private readonly dhtOperations?: DHTOperations
7271
private readonly pubsubOperations?: PubSubOperations
7372

@@ -81,7 +80,6 @@ export class Server implements Libp2pServer {
8180
handler: this.handleConnection.bind(this),
8281
upgrader: passThroughUpgrader
8382
})
84-
this.streamHandlers = {}
8583
this._onExit = this._onExit.bind(this)
8684

8785
if (libp2pNode.dht != null) {
@@ -118,9 +116,7 @@ export class Server implements Libp2pServer {
118116
}
119117

120118
const { peer, proto } = request.streamOpen
121-
122119
const peerId = peerIdFromBytes(peer)
123-
124120
const connection = await this.libp2p.dial(peerId)
125121
const stream = await connection.newStream(proto)
126122

@@ -146,18 +142,18 @@ export class Server implements Libp2pServer {
146142

147143
const protocols = request.streamHandler.proto
148144
const addr = new Multiaddr(request.streamHandler.addr)
149-
const addrString = addr.toString()
150-
151-
// If we have a handler, end it
152-
if (this.streamHandlers[addrString] != null) {
153-
await this.streamHandlers[addrString].close()
154-
delete this.streamHandlers[addrString] // eslint-disable-line @typescript-eslint/no-dynamic-delete
155-
}
145+
let conn: MultiaddrConnection
146+
147+
await this.libp2p.handle(protocols, ({ connection, stream }) => {
148+
Promise.resolve()
149+
.then(async () => {
150+
// Connect the client socket with the libp2p connection
151+
// @ts-expect-error because we use a passthrough upgrader,
152+
// this is actually a MultiaddrConnection and not a Connection
153+
conn = await this.tcp.dial(addr, {
154+
upgrader: passThroughUpgrader
155+
})
156156

157-
await Promise.all(
158-
protocols.map(async (proto) => {
159-
// Connect the client socket with the libp2p connection
160-
await this.libp2p.handle(proto, ({ connection, stream }) => {
161157
const message = StreamInfo.encode({
162158
peer: connection.remotePeer.toBytes(),
163159
addr: connection.remoteAddr.bytes,
@@ -167,21 +163,36 @@ export class Server implements Libp2pServer {
167163

168164
// Tell the client about the new connection
169165
// And then begin piping the client and peer connection
170-
void pipe(
171-
[encodedMessage, stream.source],
172-
// @ts-expect-error because we use a passthrough upgrader,
173-
// this is actually a MultiaddrConnection and not a Connection
174-
clientConnection,
166+
await pipe(
167+
(async function * () {
168+
yield encodedMessage
169+
yield * stream.source
170+
}()),
171+
async function * (source) {
172+
for await (const list of source) {
173+
// convert Uint8ArrayList to Uint8Arrays for the socket
174+
yield * list
175+
}
176+
},
177+
conn,
175178
stream.sink
176-
).catch(err => {
177-
log.error(err)
178-
})
179+
)
179180
})
180-
})
181-
)
181+
.catch(async err => {
182+
log.error(err)
182183

183-
const clientConnection = await this.tcp.dial(addr, {
184-
upgrader: passThroughUpgrader
184+
if (conn != null) {
185+
await conn.close(err)
186+
}
187+
})
188+
.finally(() => {
189+
if (conn != null) {
190+
conn.close()
191+
.catch(err => {
192+
log.error(err)
193+
})
194+
}
195+
})
185196
})
186197
}
187198

0 commit comments

Comments
 (0)