Skip to content
This repository was archived by the owner on Aug 11, 2021. It is now read-only.

refactor: Migrate to pull-streams #23

Merged
merged 1 commit into from
Sep 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ language: node_js
node_js:
- 4
- 5
- "stable"

# Make sure we have new NPM.
before_install:
Expand Down
59 changes: 25 additions & 34 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ const BlockService = require('ipfs-block-service')

### `new BlockService(repo)`

- `repo: Repo`

Creates a new block service backed by [IPFS Repo][repo] `repo` for storage.

### `goOnline(bitswap)`

- `bitswap: Bitswap`

Add a bitswap instance that communicates with the network to retreive blocks
that are not in the local store.

Expand All @@ -24,53 +28,40 @@ Remove the bitswap instance and fall back to offline mode.

Returns a `Boolean` indicating if the block service is online or not.

### `addBlock(block, callback(err))`
### `put(block, callback)`

- `block: Block`
- `callback: Function`

Asynchronously adds a block instance to the underlying repo.

### `addBlocks(blocks, callback(err))`
### `putStream()`

Asynchronously adds an array of block instances to the underlying repo.
Returns a through pull-stream, which `Block`s can be written to, and
that emits the meta data about the written block.

*Does not guarantee atomicity.*
### `get(multihash [, extension], callback)`

### `getBlock(multihash, callback(err, block))`
- `multihash: Multihash`
- `extension: String`, defaults to 'data'
- `callback: Function`

Asynchronously returns the block whose content multihash matches `multihash`.
Returns an error (`err.code === 'ENOENT'`) if the block does not exist.

If the block could not be found, expect `err.code` to be `'ENOENT'`.

### `getBlocks(multihashes, callback(err, blocks))`

Asynchronously returns the blocks whose content multihashes match the array
`multihashes`.

`blocks` is an object that maps each `multihash` to an object of the form

```js
{
err: Error
block: Block
}
```

Expect `blocks[multihash].err.code === 'ENOENT'` and `blocks[multihash].block
=== null` if a block did not exist.

*Does not guarantee atomicity.*
### `getStream(multihash [, extension])`

### `deleteBlock(multihash, callback(err))`
- `multihash: Multihash`
- `extension: String`, defaults to 'data'

Asynchronously deletes the block from the store with content multihash matching
`multihash`, if it exists.
Returns a source pull-stream, which emits the requested block.

### `bs.deleteBlocks(multihashes, callback(err))`
### `delete(multihashes, [, extension], callback)`

Asynchronously deletes all blocks from the store with content multihashes matching
from the array `multihashes`.
- `multihashes: Multihash|[]Multihash`
- `extension: String`, defaults to 'data'- `extension: String`, defaults to 'data'
- `callback: Function`

*Does not guarantee atomicity.*
Deletes all blocks referenced by multihashes.

[multihash]: https://github.com/jbenet/js-multihash
[multihash]: https://github.com/multiformats/js-multihash
[repo]: https://github.com/ipfs/specs/tree/master/repo
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ const BlockService = require('ipfs-block-service')
const BlockService = require('ipfs-block-service')
const Block = require('ipfs-block')
const IPFSRepo = require('ipfs-repo') // storage repo
const memstore = require('abstract-blob-store') // in-memory store
const Store = require(interface-pull-blob-store') // in-memory store

// setup a repo
var repo = new IPFSRepo('example', { stores: memstore })
var repo = new IPFSRepo('example', { stores: Store })

// create a block
const block = new Block('hello warld')
const block = new Block('hello world)
console.log(block.data)
console.log(block.key)

Expand Down
17 changes: 8 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,21 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-block-service#readme",
"devDependencies": {
"aegir": "^3.0.0",
"bs58": "^3.0.0",
"aegir": "^8.0.0",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"fs-pull-blob-store": "^0.3.0",
"idb-pull-blob-store": "^0.4.0",
"ipfs-block": "^0.3.0",
"ipfs-repo": "^0.7.1",
"lodash": "^4.8.2",
"ipfs-repo": "^0.9.0",
"lodash": "^4.15.0",
"ncp": "^2.0.0",
"pre-commit": "^1.1.2",
"rimraf": "^2.5.1",
"pre-commit": "^1.1.3",
"rimraf": "^2.5.4",
"run-series": "^1.1.4"
},
"dependencies": {
"multihashes": "^0.2.2",
"pull-stream": "^3.4.5",
"run-parallel-limit": "^1.0.3"
},
"contributors": [
Expand Down
90 changes: 32 additions & 58 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const parallelLimit = require('run-parallel-limit')
const mh = require('multihashes')
const pull = require('pull-stream')

// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
Expand All @@ -24,88 +24,62 @@ module.exports = class BlockService {
return this._bitswap != null
}

addBlock (block, extension, callback) {
if (this.isOnline()) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

this._bitswap.hasBlock(block, callback)
} else {
this._repo.datastore.put(block, extension, callback)
}
}

addBlocks (blocks, callback) {
if (!Array.isArray(blocks)) {
return callback(new Error('expects an array of Blocks'))
put (block, callback) {
callback = callback || (() => {})
if (!block) {
return callback(new Error('Missing block'))
}

parallelLimit(blocks.map((block) => (next) => {
this.addBlock(block, next)
}), 100, callback)
pull(
pull.values([block]),
this.putStream(),
pull.onEnd(callback)
)
}

getBlock (key, extension, callback) {
putStream () {
if (this.isOnline()) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

this._bitswap.getBlock(key, callback)
} else {
this._repo.datastore.get(key, extension, callback)
return this._bitswap.putStream()
}

return this._repo.blockstore.putStream()
}

getBlocks (multihashes, extension, callback) {
get (key, extension, callback) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

if (!Array.isArray(multihashes)) {
return callback(new Error('Invalid batch of multihashes'))
}
pull(
this.getStream(key, extension),
pull.collect((err, result) => {
if (err) return callback(err)
callback(null, result[0])
})
)
}

getStream (key, extension) {
if (this.isOnline()) {
this._bitswap.getBlocks(multihashes, (results) => {
callback(null, results)
})
return
return this._bitswap.getStream(key)
}

const results = {}
parallelLimit(multihashes.map((key) => (next) => {
this._repo.datastore.get(key, extension, (error, block) => {
results[mh.toB58String(key)] = {error, block}
next()
})
}), 100, (err) => {
callback(err, results)
})
return this._repo.blockstore.getStream(key, extension)
}

deleteBlock (key, extension, callback) {
this._repo.datastore.delete(key, extension, callback)
}

deleteBlocks (multihashes, extension, callback) {
delete (keys, extension, callback) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

if (!Array.isArray(multihashes)) {
return callback(new Error('Invalid batch of multihashes'))
if (!Array.isArray(keys)) {
keys = [keys]
}

parallelLimit(multihashes.map((multihash) => (next) => {
this.deleteBlock(multihash, extension, next)
}), 100, (err) => {
callback(err)
})
parallelLimit(keys.map((key) => (next) => {
this._repo.blockstore.delete(key, extension, next)
}), 100, callback)
}
}
Loading