Skip to content

Commit db5925b

Browse files
committed
fix: allow mss lazy select on read
Updates the multistream lazy select to support lazy select on streams that are only read from.
1 parent 13a870c commit db5925b

File tree

3 files changed

+119
-13
lines changed

3 files changed

+119
-13
lines changed

packages/multistream-select/src/handle.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,24 +55,29 @@ import type { Duplex } from 'it-stream-types'
5555
*/
5656
export async function handle <Stream extends Duplex<any, any, any>> (stream: Stream, protocols: string | string[], options: MultistreamSelectInit): Promise<ProtocolStream<Stream>> {
5757
protocols = Array.isArray(protocols) ? protocols : [protocols]
58+
options.log.trace('available protocols %s', protocols)
59+
5860
const lp = lpStream(stream, {
59-
maxDataLength: MAX_PROTOCOL_LENGTH
61+
maxDataLength: MAX_PROTOCOL_LENGTH,
62+
maxLengthLength: 2 // 2 bytes is enough to length-prefix MAX_PROTOCOL_LENGTH
6063
})
6164

6265
while (true) {
63-
options.log.trace('handle - available protocols %s', protocols)
66+
options?.log.trace('reading incoming string')
6467
const protocol = await multistream.readString(lp, options)
6568
options.log.trace('read "%s"', protocol)
6669

6770
if (protocol === PROTOCOL_ID) {
6871
options.log.trace('respond with "%s" for "%s"', PROTOCOL_ID, protocol)
6972
await multistream.write(lp, uint8ArrayFromString(`${PROTOCOL_ID}\n`), options)
73+
options.log.trace('responded with "%s" for "%s"', PROTOCOL_ID, protocol)
7074
continue
7175
}
7276

7377
if (protocols.includes(protocol)) {
7478
options.log.trace('respond with "%s" for "%s"', protocol, protocol)
7579
await multistream.write(lp, uint8ArrayFromString(`${protocol}\n`), options)
80+
options.log.trace('responded with "%s" for "%s"', protocol, protocol)
7681

7782
return { stream: lp.unwrap(), protocol }
7883
}
@@ -84,8 +89,9 @@ export async function handle <Stream extends Duplex<any, any, any>> (stream: Str
8489
uint8ArrayFromString('\n')
8590
)
8691

87-
await multistream.write(lp, protos, options)
8892
options.log.trace('respond with "%s" for %s', protocols, protocol)
93+
await multistream.write(lp, protos, options)
94+
options.log.trace('responded with "%s" for %s', protocols, protocol)
8995
continue
9096
}
9197

packages/multistream-select/src/select.ts

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { CodeError } from '@libp2p/interface/errors'
22
import { lpStream } from 'it-length-prefixed-stream'
3+
import * as varint from 'uint8-varint'
4+
import { Uint8ArrayList } from 'uint8arraylist'
35
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
46
import { MAX_PROTOCOL_LENGTH } from './constants.js'
57
import * as multistream from './multistream.js'
@@ -109,6 +111,7 @@ export async function select <Stream extends Duplex<any, any, any>> (stream: Str
109111
export function lazySelect <Stream extends Duplex<any, any, any>> (stream: Stream, protocol: string, options: MultistreamSelectInit): ProtocolStream<Stream> {
110112
const originalSink = stream.sink.bind(stream)
111113
const originalSource = stream.source
114+
let selected = false
112115

113116
const lp = lpStream({
114117
sink: originalSink,
@@ -118,20 +121,47 @@ export function lazySelect <Stream extends Duplex<any, any, any>> (stream: Strea
118121
})
119122

120123
stream.sink = async source => {
121-
options?.log.trace('lazy: write ["%s", "%s"]', PROTOCOL_ID, protocol)
122-
123-
await lp.writeV([
124-
uint8ArrayFromString(`${PROTOCOL_ID}\n`),
125-
uint8ArrayFromString(`${protocol}\n`)
126-
])
127-
128-
options?.log.trace('lazy: writing rest of "%s" stream', protocol)
129-
await lp.unwrap().sink(source)
124+
const { sink } = lp.unwrap()
125+
126+
await sink(async function * () {
127+
for await (const buf of source) {
128+
// if writing before selecting, send selection with first data chunk
129+
if (!selected) {
130+
selected = true
131+
options?.log.trace('lazy: write ["%s", "%s", data] in sink', PROTOCOL_ID, protocol)
132+
133+
const protocolString = `${protocol}\n`
134+
135+
// send protocols in first chunk of data written to transport
136+
yield new Uint8ArrayList(
137+
Uint8Array.from([19]), // length of PROTOCOL_ID plus newline
138+
uint8ArrayFromString(`${PROTOCOL_ID}\n`),
139+
varint.encode(protocolString.length),
140+
uint8ArrayFromString(protocolString),
141+
buf
142+
).subarray()
143+
144+
options?.log.trace('lazy: wrote ["%s", "%s", data] in sink', PROTOCOL_ID, protocol)
145+
} else {
146+
yield buf
147+
}
148+
}
149+
}())
130150
}
131151

132152
stream.source = (async function * () {
133-
options?.log.trace('lazy: reading multistream select header')
153+
// if reading before selecting, send selection before first data chunk
154+
if (!selected) {
155+
selected = true
156+
options?.log.trace('lazy: write ["%s", "%s", data] in source', PROTOCOL_ID, protocol)
157+
await lp.writeV([
158+
uint8ArrayFromString(`${PROTOCOL_ID}\n`),
159+
uint8ArrayFromString(`${protocol}\n`)
160+
])
161+
options?.log.trace('lazy: wrote ["%s", "%s", data] in source', PROTOCOL_ID, protocol)
162+
}
134163

164+
options?.log.trace('lazy: reading multistream select header')
135165
let response = await multistream.readString(lp, options)
136166
options?.log.trace('lazy: read multistream select header "%s"', response)
137167

packages/multistream-select/test/integration.spec.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,76 @@ describe('Dialer and Listener integration', () => {
113113
expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
114114
})
115115

116+
it('should handle and lazySelect that fails', async () => {
117+
const protocol = '/echo/1.0.0'
118+
const otherProtocol = '/echo/2.0.0'
119+
const pair = duplexPair<Uint8ArrayList | Uint8Array>()
120+
121+
const dialerSelection = mss.lazySelect(pair[0], protocol, {
122+
log: logger('mss:test')
123+
})
124+
expect(dialerSelection.protocol).to.equal(protocol)
125+
126+
// the listener handles the incoming stream
127+
void mss.handle(pair[1], otherProtocol, {
128+
log: logger('mss:test')
129+
})
130+
131+
// should fail when we interact with the stream
132+
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
133+
await expect(pipe(input, dialerSelection.stream, async source => all(source)))
134+
.to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
135+
})
136+
137+
it('should handle and lazySelect only by reading', async () => {
138+
const protocol = '/echo/1.0.0'
139+
const pair = duplexPair<Uint8ArrayList | Uint8Array>()
140+
141+
const dialerSelection = mss.lazySelect(pair[0], protocol, {
142+
log: logger('mss:dialer')
143+
})
144+
expect(dialerSelection.protocol).to.equal(protocol)
145+
146+
// ensure stream is usable after selection
147+
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
148+
149+
const [, dialerOut] = await Promise.all([
150+
// the listener handles the incoming stream
151+
mss.handle(pair[1], protocol, {
152+
log: logger('mss:listener')
153+
}).then(async result => {
154+
// the listener writes to the incoming stream
155+
await pipe(input, result.stream)
156+
}),
157+
158+
// the dialer just reads from the stream
159+
pipe(dialerSelection.stream, async source => all(source))
160+
])
161+
162+
expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
163+
})
164+
165+
it('should handle and lazySelect only by reading that fails', async () => {
166+
const protocol = '/echo/1.0.0'
167+
const otherProtocol = '/echo/2.0.0'
168+
const pair = duplexPair<Uint8ArrayList | Uint8Array>()
169+
170+
// lazy succeeds
171+
const dialerSelection = mss.lazySelect(pair[0], protocol, {
172+
log: logger('mss:dialer')
173+
})
174+
expect(dialerSelection.protocol).to.equal(protocol)
175+
176+
// the listener handles the incoming stream
177+
void mss.handle(pair[1], otherProtocol, {
178+
log: logger('mss:listener')
179+
})
180+
181+
// should fail when we interact with the stream
182+
await expect(pipe(dialerSelection.stream, async source => all(source)))
183+
.to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
184+
})
185+
116186
it('should abort an unhandled lazySelect', async () => {
117187
const protocol = '/echo/1.0.0'
118188
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

0 commit comments

Comments
 (0)