Skip to content

Commit 1f18434

Browse files
Merge pull request #25 from ipfs/pull
[WIP] Pull streams
2 parents b77cb11 + f3dfceb commit 1f18434

16 files changed

+625
-492
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ sudo: false
22
language: node_js
33
node_js:
44
- 4
5-
- 5
5+
- stable
66

77
# Make sure we have new NPM.
88
before_install:

API.md

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,52 +6,61 @@
66

77
- `id: PeerId`, the id of the local instance.
88
- `libp2p: Libp2p`, instance of the local network stack.
9-
- `datastore: Datastore`, instance of the local database (`IpfsRepo.datastore`)
9+
- `blockstore: Datastore`, instance of the local database (`IpfsRepo.blockstore`)
1010

1111
Create a new instance.
1212

13-
### `getBlock(key, cb)`
1413

15-
- `key: Multihash`
16-
- `cb: Function`
14+
### `getStream(key)`
1715

18-
Fetch a single block.
16+
- `key: Multihash|Array`
1917

20-
> Note: This is safe guarded so that the network is not asked
21-
> for blocks that are in the local `datastore`.
18+
Returns a source `pull-stream`. Values emitted are the received blocks.
2219

23-
### `getBlocks(keys, cb)`
24-
25-
- `keys: []Multihash`
26-
- `cb: Function`
20+
Example:
2721

28-
Fetch multiple blocks. The `cb` is called with a result object of the form
2922
```js
30-
{
31-
[key1]: {error: errorOrUndefined, block: blockOrUndefined},
32-
[key2]: {error: errorOrUndefined, block: blockOrUndefined},
33-
...
34-
}
23+
// Single block
24+
pull(
25+
bitswap.getStream(key),
26+
pull.collect((err, blocks) => {
27+
// blocks === [block]
28+
})
29+
)
30+
31+
// Many blocks
32+
pull(
33+
bitswap.getStream([key1, key2, key3]),
34+
pull.collect((err, blocks) => {
35+
// blocks === [block1, block2, block3]
36+
})
37+
)
3538
```
3639

37-
Where `key<i>` is the multihash of the block.
3840

39-
### `unwantBlocks(keys)`
41+
> Note: This is safe guarded so that the network is not asked
42+
> for blocks that are in the local `datastore`.
43+
4044

41-
- `keys: []Multihash`
45+
### `unwant(keys)`
46+
47+
- `keys: Mutlihash|[]Multihash`
4248

4349
Cancel previously requested keys, forcefully. That means they are removed from the
4450
wantlist independent of how many other resources requested these keys. Callbacks
4551
attached to `getBlock` are errored with `Error('manual unwant: key')`.
4652

4753
### `cancelWants(keys)`
4854

49-
- `keys: []Multihash`
55+
- `keys: Multihash|[]Multihash`
5056

5157
Cancel previously requested keys.
5258

59+
### `putStream()`
60+
61+
Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored.
5362

54-
### `hasBlock(block, cb)`
63+
### `put(block, cb)`
5564

5665
- `block: IpfsBlock`
5766
- `cb: Function`

package.json

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,40 +33,43 @@
3333
},
3434
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
3535
"devDependencies": {
36-
"abstract-blob-store": "^3.2.0",
3736
"aegir": "^8.0.1",
3837
"buffer-loader": "0.0.1",
3938
"chai": "^3.5.0",
40-
"fs-blob-store": "^5.2.1",
41-
"idb-plus-blob-store": "^1.1.2",
42-
"ipfs-repo": "^0.8.0",
43-
"libp2p-ipfs": "^0.12.0",
44-
"lodash": "^4.13.1",
39+
"fs-pull-blob-store": "^0.3.0",
40+
"idb-pull-blob-store": "^0.4.0",
41+
"interface-pull-blob-store": "^0.5.0",
42+
"ipfs-repo": "^0.9.0",
43+
"libp2p-ipfs": "^0.13.0",
44+
"lodash": "^4.15.0",
4545
"multiaddr": "^2.0.3",
4646
"ncp": "^2.0.0",
4747
"peer-book": "^0.3.0",
4848
"peer-id": "^0.7.0",
4949
"peer-info": "^0.7.1",
50-
"rimraf": "^2.5.2",
50+
"rimraf": "^2.5.4",
5151
"safe-buffer": "^5.0.1"
5252
},
5353
"dependencies": {
54-
"async": "^2.0.0-rc.5",
55-
"bl": "^1.1.2",
54+
"async": "^2.0.1",
5655
"debug": "^2.2.0",
5756
"heap": "^0.2.6",
58-
"highland": "^3.0.0-beta.1",
5957
"ipfs-block": "^0.3.0",
60-
"length-prefixed-stream": "^1.5.0",
61-
"lodash.isequalwith": "^4.2.0",
58+
"lodash.isequalwith": "^4.4.0",
6259
"lodash.isundefined": "^3.0.1",
6360
"multihashes": "^0.2.2",
64-
"protocol-buffers": "^3.1.6"
61+
"protocol-buffers": "^3.1.6",
62+
"pull-defer": "^0.2.2",
63+
"pull-generate": "^2.2.0",
64+
"pull-length-prefixed": "^1.2.0",
65+
"pull-paramap": "^1.1.6",
66+
"pull-pushable": "^2.0.1",
67+
"pull-stream": "^3.4.5"
6568
},
6669
"contributors": [
6770
"David Dias <[email protected]>",
6871
"Richard Littauer <[email protected]>",
6972
"Stephen Whitmore <[email protected]>",
7073
"dignifiedquire <[email protected]>"
7174
]
72-
}
75+
}

src/decision/engine.js

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
'use strict'
22

33
const debug = require('debug')
4-
const _ = require('highland')
5-
const async = require('async')
64
const mh = require('multihashes')
5+
const pull = require('pull-stream')
6+
const generate = require('pull-generate')
77

88
const log = debug('bitswap:engine')
99
log.error = debug('bitswap:engine:error')
@@ -14,8 +14,8 @@ const PeerRequestQueue = require('./peer-request-queue')
1414
const Ledger = require('./ledger')
1515

1616
module.exports = class Engine {
17-
constructor (datastore, network) {
18-
this.datastore = datastore
17+
constructor (blockstore, network) {
18+
this.blockstore = blockstore
1919
this.network = network
2020

2121
// A list of of ledgers by their partner id
@@ -45,34 +45,43 @@ module.exports = class Engine {
4545
_outbox () {
4646
if (!this._running) return
4747

48-
const doIt = (cb) => {
49-
_((push, next) => {
50-
if (!this._running) return push(null, _.nil)
51-
const nextTask = this.peerRequestQueue.pop()
48+
const doIt = (cb) => pull(
49+
generate(null, (state, cb) => {
50+
log('generating', this._running)
51+
if (!this._running) {
52+
return cb(true)
53+
}
5254

53-
if (!nextTask) return push(null, _.nil)
55+
const nextTask = this.peerRequestQueue.pop()
56+
log('got task', nextTask)
57+
if (!nextTask) {
58+
return cb(true)
59+
}
5460

55-
this.datastore.get(nextTask.entry.key, (err, block) => {
56-
if (err || !block) {
57-
nextTask.done()
58-
} else {
59-
push(null, {
61+
pull(
62+
this.blockstore.getStream(nextTask.entry.key),
63+
pull.collect((err, blocks) => {
64+
log('generated', blocks)
65+
const block = blocks[0]
66+
if (err || !block) {
67+
nextTask.done()
68+
return cb(null, false)
69+
}
70+
71+
cb(null, {
6072
peer: nextTask.target,
6173
block: block,
6274
sent: () => {
6375
nextTask.done()
6476
}
6577
})
66-
}
67-
68-
next()
69-
})
70-
})
71-
.flatMap((envelope) => {
72-
return _.wrapCallback(this._sendBlock.bind(this))(envelope)
73-
})
74-
.done(cb)
75-
}
78+
})
79+
)
80+
}),
81+
pull.filter(Boolean),
82+
pull.asyncMap(this._sendBlock.bind(this)),
83+
pull.onEnd(cb)
84+
)
7685

7786
if (!this._timer) {
7887
this._timer = setTimeout(() => {
@@ -97,40 +106,43 @@ module.exports = class Engine {
97106

98107
// Handle incoming messages
99108
messageReceived (peerId, msg, cb) {
109+
const ledger = this._findOrCreate(peerId)
110+
100111
if (msg.empty) {
101112
log('received empty message from %s', peerId.toB58String())
113+
return cb()
102114
}
103115

104-
const ledger = this._findOrCreate(peerId)
105-
106116
// If the message was a full wantlist clear the current one
107117
if (msg.full) {
108118
ledger.wantlist = new Wantlist()
109119
}
110120

111121
this._processBlocks(msg.blocks, ledger)
112122
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))
113-
async.eachSeries(
114-
msg.wantlist.values(),
115-
this._processWantlist.bind(this, ledger, peerId),
116-
(err) => {
117-
const done = (err) => async.setImmediate(() => cb(err))
118-
if (err) return done(err)
123+
124+
pull(
125+
pull.values(Array.from(msg.wantlist.values())),
126+
pull.asyncMap((entry, cb) => {
127+
this._processWantlist(ledger, peerId, entry, cb)
128+
}),
129+
pull.onEnd((err) => {
130+
if (err) return cb(err)
119131
this._outbox()
120-
done()
121-
}
132+
cb()
133+
})
122134
)
123135
}
124136

125-
receivedBlock (block) {
126-
this._processBlock(block)
137+
receivedBlock (key) {
138+
this._processBlock(key)
127139
this._outbox()
128140
}
129141

130-
_processBlock (block) {
142+
_processBlock (key) {
131143
// Check all connected peers if they want the block we received
132144
for (let l of this.ledgerMap.values()) {
133-
const entry = l.wantlistContains(block.key)
145+
const entry = l.wantlistContains(key)
134146

135147
if (entry) {
136148
this.peerRequestQueue.push(entry, l.partner)
@@ -143,13 +155,13 @@ module.exports = class Engine {
143155
log('cancel %s', mh.toB58String(entry.key))
144156
ledger.cancelWant(entry.key)
145157
this.peerRequestQueue.remove(entry.key, peerId)
146-
async.setImmediate(() => cb())
158+
setImmediate(() => cb())
147159
} else {
148160
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
149161
ledger.wants(entry.key, entry.priority)
150162

151163
// If we already have the block, serve it
152-
this.datastore.has(entry.key, (err, exists) => {
164+
this.blockstore.has(entry.key, (err, exists) => {
153165
if (err) {
154166
log('failed existence check %s', mh.toB58String(entry.key))
155167
} else if (exists) {
@@ -166,7 +178,7 @@ module.exports = class Engine {
166178
log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length)
167179
ledger.receivedBytes(block.data.length)
168180

169-
this.receivedBlock(block)
181+
this.receivedBlock(block.key)
170182
}
171183
}
172184

src/decision/peer-request-queue.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,14 @@ function taskKey (peerId, key) {
152152

153153
function partnerCompare (a, b) {
154154
// having no blocks in their wantlist means lowest priority
155-
// having both of these checks ensures stability of the sort
155+
// having both of these checks ensures stability of the sort
156156
if (a.requests === 0) return false
157157
if (b.requests === 0) return true
158158

159159
if (a.active === b.active) {
160160
// sorting by taskQueue.size() aids in cleaning out trash entries faster
161-
// if we sorted instead by requests, one peer could potentially build up
162-
// a huge number of cancelled entries in the queue resulting in a memory leak
161+
// if we sorted instead by requests, one peer could potentially build up
162+
// a huge number of cancelled entries in the queue resulting in a memory leak
163163
return a.taskQueue.size() > b.taskQueue.size()
164164
}
165165

0 commit comments

Comments
 (0)