Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 20cf80a

Browse files
committed
feat: mplex is all here
1 parent 0df73ef commit 20cf80a

File tree

9 files changed

+1180
-31
lines changed

9 files changed

+1180
-31
lines changed

package.json

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,29 @@
2929
},
3030
"homepage": "https://github.com/libp2p/js-libp2p-multiplex#readme",
3131
"devDependencies": {
32-
"aegir": "^12.2.0",
32+
"aegir": "^13.0.1",
3333
"chai": "^4.1.2",
3434
"dirty-chai": "^2.0.1",
3535
"interface-stream-muxer": "~0.5.9",
36-
"libp2p-tcp": "^0.11.1",
36+
"libp2p-tcp": "^0.11.5",
3737
"libp2p-websockets": "~0.10.4",
3838
"pre-commit": "^1.2.2",
3939
"pull-pair": "^1.1.0"
4040
},
4141
"dependencies": {
4242
"async": "^2.6.0",
43-
"multiplex": "dignifiedquire/multiplex",
43+
"chunky": "0.0.0",
44+
"concat-stream": "^1.6.0",
45+
"debug": "^3.1.0",
46+
"duplexify": "^3.5.3",
4447
"pull-catch": "^1.0.0",
4548
"pull-stream": "^3.6.1",
4649
"pull-stream-to-stream": "^1.3.4",
47-
"pump": "^2.0.0",
48-
"stream-to-pull-stream": "^1.7.2"
50+
"pump": "^3.0.0",
51+
"readable-stream": "^2.3.4",
52+
"stream-to-pull-stream": "^1.7.2",
53+
"through2": "^2.0.3",
54+
"varint": "^5.0.0"
4955
},
5056
"contributors": [
5157
"David Dias <[email protected]>",
File renamed without changes.

src/index.js

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
'use strict'
22

3-
const Multiplex = require('multiplex')
43
const toStream = require('pull-stream-to-stream')
5-
6-
const MULTIPLEX_CODEC = require('./multiplex-codec')
4+
const MplexCore = require('./internals')
5+
const MULTIPLEX_CODEC = require('./codec')
76
const Muxer = require('./muxer')
87

98
const pump = require('pump')
109

1110
function create (rawConn, isListener) {
1211
const stream = toStream(rawConn)
1312

14-
// Cleanup and destroy the connection when it ends
15-
// as the converted stream doesn't emit 'close'
16-
// but .destroy will trigger a 'close' event.
13+
// Cleanup and destroy the connection when it ends as the converted stream
14+
// doesn't emit 'close' but .destroy will trigger a 'close' event.
1715
stream.on('end', () => stream.destroy())
1816

19-
const mpx = new Multiplex({
17+
const mpx = new MplexCore({
2018
halfOpen: true,
2119
initiator: !isListener
2220
})

src/internals/channel.js

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
'use strict'
2+
/* @flow */
3+
4+
const EventEmitter = require('events').EventEmitter
5+
const stream = require('readable-stream')
6+
const debug = require('debug')
7+
8+
/* :: import type Multiplex from './index'
9+
10+
export type ChannelOpts = {
11+
chunked?: bool,
12+
halfOpen?: bool,
13+
lazy?: bool
14+
}
15+
*/
16+
17+
class Channel extends stream.Duplex {
18+
constructor (name/* : Buffer | string */, plex/* : Multiplex */, opts/* : ChannelOpts = {} */) {
19+
const halfOpen = Boolean(opts.halfOpen)
20+
super({
21+
allowHalfOpen: halfOpen
22+
})
23+
24+
this.name = name
25+
this.log = debug('mplex:channel:' + this.name.toString())
26+
this.channel = 0
27+
this.initiator = false
28+
this.chunked = Boolean(opts.chunked)
29+
this.halfOpen = halfOpen
30+
this.destroyed = false
31+
this.finalized = false
32+
33+
this._multiplex = plex
34+
this._dataHeader = 0
35+
this._opened = false
36+
this._awaitDrain = 0
37+
this._lazy = Boolean(opts.lazy)
38+
39+
let finished = false
40+
let ended = false
41+
this.log('open, halfOpen: ' + this.halfOpen)
42+
43+
this.once('end', () => {
44+
this.log('end')
45+
this._read() // trigger drain
46+
47+
if (this.destroyed) {
48+
return
49+
}
50+
51+
ended = true
52+
if (finished) {
53+
this._finalize()
54+
} else if (!this.halfOpen) {
55+
this.end()
56+
}
57+
})
58+
59+
this.once('finish', function onfinish () {
60+
if (this.destroyed) {
61+
return
62+
}
63+
64+
if (!this._opened) {
65+
return this.once('open', onfinish)
66+
}
67+
68+
if (this._lazy && this.initiator) {
69+
this._open()
70+
}
71+
72+
this._multiplex._send(
73+
this.channel << 3 | (this.initiator ? 4 : 3),
74+
null
75+
)
76+
77+
finished = true
78+
79+
if (ended) {
80+
this._finalize()
81+
}
82+
})
83+
}
84+
85+
destroy (err/* : Error */) {
86+
this._destroy(err, true)
87+
}
88+
89+
_destroy (err/* : Error */, local/* : bool */) {
90+
this.log('_destroy:' + (local ? 'local' : 'remote'))
91+
if (this.destroyed) {
92+
this.log('already destroyed')
93+
return
94+
}
95+
96+
this.destroyed = true
97+
98+
const hasErrorListeners = EventEmitter.listenerCount(this, 'error') > 0
99+
100+
if (err && (!local || hasErrorListeners)) {
101+
this.emit('error', err)
102+
}
103+
104+
this.emit('close')
105+
106+
if (local && this._opened) {
107+
if (this._lazy && this.initiator) {
108+
this._open()
109+
}
110+
111+
const msg = err ? new Buffer(err.message) : null
112+
try {
113+
this._multiplex._send(
114+
this.channel << 3 | (this.initiator ? 6 : 5),
115+
msg
116+
)
117+
} catch (e) {}
118+
}
119+
120+
this._finalize()
121+
}
122+
123+
_finalize () {
124+
if (this.finalized) {
125+
return
126+
}
127+
128+
this.finalized = true
129+
this.emit('finalize')
130+
}
131+
132+
_write (data/* : Buffer */, enc/* : string */, cb/* : () => void */) {
133+
this.log('write: ', data.length)
134+
if (!this._opened) {
135+
this.once('open', () => {
136+
this._write(data, enc, cb)
137+
})
138+
return
139+
}
140+
141+
if (this.destroyed) {
142+
cb()
143+
return
144+
}
145+
146+
if (this._lazy && this.initiator) {
147+
this._open()
148+
}
149+
150+
const drained = this._multiplex._send(
151+
this._dataHeader,
152+
data
153+
)
154+
155+
if (drained) {
156+
cb()
157+
return
158+
}
159+
160+
this._multiplex._ondrain.push(cb)
161+
}
162+
163+
_read () {
164+
if (this._awaitDrain) {
165+
const drained = this._awaitDrain
166+
this._awaitDrain = 0
167+
this._multiplex._onchanneldrain(drained)
168+
}
169+
}
170+
171+
_open () {
172+
let buf = null
173+
if (Buffer.isBuffer(this.name)) {
174+
buf = this.name
175+
} else if (this.name !== this.channel.toString()) {
176+
buf = new Buffer(this.name)
177+
}
178+
179+
this._lazy = false
180+
this._multiplex._send(this.channel << 3 | 0, buf)
181+
}
182+
183+
open (channel/* : number */, initiator/* : bool */) {
184+
this.log('open: ' + channel)
185+
this.channel = channel
186+
this.initiator = initiator
187+
this._dataHeader = channel << 3 | (initiator ? 2 : 1)
188+
this._opened = true
189+
if (!this._lazy && this.initiator) this._open()
190+
this.emit('open')
191+
}
192+
}
193+
194+
module.exports = Channel

0 commit comments

Comments
 (0)