Skip to content
This repository was archived by the owner on Aug 11, 2021. It is now read-only.

Commit 694dd75

Browse files
dignifiedquiredaviddias
authored andcommitted
refactor: Migrate to pull-streams
1 parent 6019771 commit 694dd75

File tree

44 files changed

+194
-272
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+194
-272
lines changed

API.md

+25-34
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@ const BlockService = require('ipfs-block-service')
66

77
### `new BlockService(repo)`
88

9+
- `repo: Repo`
10+
911
Creates a new block service backed by [IPFS Repo][repo] `repo` for storage.
1012

1113
### `goOnline(bitswap)`
1214

15+
- `bitswap: Bitswap`
16+
1317
Add a bitswap instance that communicates with the network to retreive blocks
1418
that are not in the local store.
1519

@@ -24,53 +28,40 @@ Remove the bitswap instance and fall back to offline mode.
2428

2529
Returns a `Boolean` indicating if the block service is online or not.
2630

27-
### `addBlock(block, callback(err))`
31+
### `put(block, callback)`
32+
33+
- `block: Block`
34+
- `callback: Function`
2835

2936
Asynchronously adds a block instance to the underlying repo.
3037

31-
### `addBlocks(blocks, callback(err))`
38+
### `putStream()`
3239

33-
Asynchronously adds an array of block instances to the underlying repo.
40+
Returns a through pull-stream, which `Block`s can be written to, and
41+
that emits the meta data about the written block.
3442

35-
*Does not guarantee atomicity.*
43+
### `get(multihash [, extension], callback)`
3644

37-
### `getBlock(multihash, callback(err, block))`
45+
- `multihash: Multihash`
46+
- `extension: String`, defaults to 'data'
47+
- `callback: Function`
3848

3949
Asynchronously returns the block whose content multihash matches `multihash`.
40-
Returns an error (`err.code === 'ENOENT'`) if the block does not exist.
41-
42-
If the block could not be found, expect `err.code` to be `'ENOENT'`.
43-
44-
### `getBlocks(multihashes, callback(err, blocks))`
45-
46-
Asynchronously returns the blocks whose content multihashes match the array
47-
`multihashes`.
48-
49-
`blocks` is an object that maps each `multihash` to an object of the form
50-
51-
```js
52-
{
53-
err: Error
54-
block: Block
55-
}
56-
```
57-
58-
Expect `blocks[multihash].err.code === 'ENOENT'` and `blocks[multihash].block
59-
=== null` if a block did not exist.
6050

61-
*Does not guarantee atomicity.*
51+
### `getStream(multihash [, extension])`
6252

63-
### `deleteBlock(multihash, callback(err))`
53+
- `multihash: Multihash`
54+
- `extension: String`, defaults to 'data'
6455

65-
Asynchronously deletes the block from the store with content multihash matching
66-
`multihash`, if it exists.
56+
Returns a source pull-stream, which emits the requested block.
6757

68-
### `bs.deleteBlocks(multihashes, callback(err))`
58+
### `delete(multihashes, [, extension], callback)`
6959

70-
Asynchronously deletes all blocks from the store with content multihashes matching
71-
from the array `multihashes`.
60+
- `multihashes: Multihash|[]Multihash`
61+
- `extension: String`, defaults to 'data'- `extension: String`, defaults to 'data'
62+
- `callback: Function`
7263

73-
*Does not guarantee atomicity.*
64+
Deletes all blocks referenced by multihashes.
7465

75-
[multihash]: https://github.com/jbenet/js-multihash
66+
[multihash]: https://github.com/multiformats/js-multihash
7667
[repo]: https://github.com/ipfs/specs/tree/master/repo

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ const BlockService = require('ipfs-block-service')
6464
const BlockService = require('ipfs-block-service')
6565
const Block = require('ipfs-block')
6666
const IPFSRepo = require('ipfs-repo') // storage repo
67-
const memstore = require('abstract-blob-store') // in-memory store
67+
const Store = require(interface-pull-blob-store') // in-memory store
6868
6969
// setup a repo
70-
var repo = new IPFSRepo('example', { stores: memstore })
70+
var repo = new IPFSRepo('example', { stores: Store })
7171
7272
// create a block
73-
const block = new Block('hello warld')
73+
const block = new Block('hello world)
7474
console.log(block.data)
7575
console.log(block.key)
7676

package.json

+8-9
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,21 @@
3737
},
3838
"homepage": "https://github.com/ipfs/js-ipfs-block-service#readme",
3939
"devDependencies": {
40-
"aegir": "^3.0.0",
41-
"bs58": "^3.0.0",
40+
"aegir": "^8.0.0",
4241
"buffer-loader": "0.0.1",
4342
"chai": "^3.5.0",
44-
"fs-blob-store": "^5.2.1",
45-
"idb-plus-blob-store": "^1.1.2",
43+
"fs-pull-blob-store": "^0.3.0",
44+
"idb-pull-blob-store": "^0.4.0",
4645
"ipfs-block": "^0.3.0",
47-
"ipfs-repo": "^0.7.1",
48-
"lodash": "^4.8.2",
46+
"ipfs-repo": "^0.9.0",
47+
"lodash": "^4.15.0",
4948
"ncp": "^2.0.0",
50-
"pre-commit": "^1.1.2",
51-
"rimraf": "^2.5.1",
49+
"pre-commit": "^1.1.3",
50+
"rimraf": "^2.5.4",
5251
"run-series": "^1.1.4"
5352
},
5453
"dependencies": {
55-
"multihashes": "^0.2.2",
54+
"pull-stream": "^3.4.5",
5655
"run-parallel-limit": "^1.0.3"
5756
},
5857
"contributors": [

src/index.js

+32-58
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const parallelLimit = require('run-parallel-limit')
4-
const mh = require('multihashes')
4+
const pull = require('pull-stream')
55

66
// BlockService is a hybrid block datastore. It stores data in a local
77
// datastore and may retrieve data from a remote Exchange.
@@ -24,88 +24,62 @@ module.exports = class BlockService {
2424
return this._bitswap != null
2525
}
2626

27-
addBlock (block, extension, callback) {
28-
if (this.isOnline()) {
29-
if (typeof extension === 'function') {
30-
callback = extension
31-
extension = undefined
32-
}
33-
34-
this._bitswap.hasBlock(block, callback)
35-
} else {
36-
this._repo.datastore.put(block, extension, callback)
37-
}
38-
}
39-
40-
addBlocks (blocks, callback) {
41-
if (!Array.isArray(blocks)) {
42-
return callback(new Error('expects an array of Blocks'))
27+
put (block, callback) {
28+
callback = callback || (() => {})
29+
if (!block) {
30+
return callback(new Error('Missing block'))
4331
}
4432

45-
parallelLimit(blocks.map((block) => (next) => {
46-
this.addBlock(block, next)
47-
}), 100, callback)
33+
pull(
34+
pull.values([block]),
35+
this.putStream(),
36+
pull.onEnd(callback)
37+
)
4838
}
4939

50-
getBlock (key, extension, callback) {
40+
putStream () {
5141
if (this.isOnline()) {
52-
if (typeof extension === 'function') {
53-
callback = extension
54-
extension = undefined
55-
}
56-
57-
this._bitswap.getBlock(key, callback)
58-
} else {
59-
this._repo.datastore.get(key, extension, callback)
42+
return this._bitswap.putStream()
6043
}
44+
45+
return this._repo.blockstore.putStream()
6146
}
6247

63-
getBlocks (multihashes, extension, callback) {
48+
get (key, extension, callback) {
6449
if (typeof extension === 'function') {
6550
callback = extension
6651
extension = undefined
6752
}
6853

69-
if (!Array.isArray(multihashes)) {
70-
return callback(new Error('Invalid batch of multihashes'))
71-
}
54+
pull(
55+
this.getStream(key, extension),
56+
pull.collect((err, result) => {
57+
if (err) return callback(err)
58+
callback(null, result[0])
59+
})
60+
)
61+
}
7262

63+
getStream (key, extension) {
7364
if (this.isOnline()) {
74-
this._bitswap.getBlocks(multihashes, (results) => {
75-
callback(null, results)
76-
})
77-
return
65+
return this._bitswap.getStream(key)
7866
}
7967

80-
const results = {}
81-
parallelLimit(multihashes.map((key) => (next) => {
82-
this._repo.datastore.get(key, extension, (error, block) => {
83-
results[mh.toB58String(key)] = {error, block}
84-
next()
85-
})
86-
}), 100, (err) => {
87-
callback(err, results)
88-
})
68+
return this._repo.blockstore.getStream(key, extension)
8969
}
9070

91-
deleteBlock (key, extension, callback) {
92-
this._repo.datastore.delete(key, extension, callback)
93-
}
94-
95-
deleteBlocks (multihashes, extension, callback) {
71+
delete (keys, extension, callback) {
9672
if (typeof extension === 'function') {
9773
callback = extension
9874
extension = undefined
9975
}
10076

101-
if (!Array.isArray(multihashes)) {
102-
return callback(new Error('Invalid batch of multihashes'))
77+
if (!Array.isArray(keys)) {
78+
keys = [keys]
10379
}
10480

105-
parallelLimit(multihashes.map((multihash) => (next) => {
106-
this.deleteBlock(multihash, extension, next)
107-
}), 100, (err) => {
108-
callback(err)
109-
})
81+
parallelLimit(keys.map((key) => (next) => {
82+
this._repo.blockstore.delete(key, extension, next)
83+
}), 100, callback)
11084
}
11185
}

0 commit comments

Comments
 (0)