Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

refactor(streams): Refactor response stream handling. #465

Merged
merged 8 commits into from
Dec 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
},
"dependencies": {
"async": "^2.1.4",
"bl": "^1.1.2",
"bs58": "^4.0.0",
"concat-stream": "^1.6.0",
"detect-node": "^2.0.3",
Expand All @@ -43,6 +42,7 @@
"peer-id": "^0.8.1",
"peer-info": "^0.8.1",
"promisify-es6": "^1.0.2",
"pump": "^1.0.2",
"qs": "^6.3.0",
"readable-stream": "1.1.14",
"stream-http": "^2.5.0",
Expand Down
18 changes: 10 additions & 8 deletions src/api/add.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
'use strict'

const isStream = require('isstream')
const addToDagNodesTransform = require('../add-to-dagnode-transform')
const promisify = require('promisify-es6')
const DAGNodeStream = require('../dagnode-stream')

module.exports = (send) => {
return promisify((files, callback) => {
const good = Buffer.isBuffer(files) ||
const ok = Buffer.isBuffer(files) ||
isStream.isReadable(files) ||
Array.isArray(files)

if (!good) {
callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
if (!ok) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you think ok is more descriptive and intuitive than good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine :)

return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

}

const sendWithTransform = send.withTransform(addToDagNodesTransform)

return sendWithTransform({
const request = {
path: 'add',
files: files
}, callback)
}

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
send.andTransform(request, transform, callback)
})
}
55 changes: 30 additions & 25 deletions src/api/block.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict'

const promisify = require('promisify-es6')
const bl = require('bl')
const Block = require('ipfs-block')
const multihash = require('multihashes')
const CID = require('cids')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return {
Expand All @@ -21,25 +21,27 @@ module.exports = (send) => {
opts = {}
}

return send({
path: 'block/get',
args: args,
qs: opts
}, (err, res) => {
if (err) {
return callback(err)
}
// Transform the response from Buffer or a Stream to a Block
const transform = (res, callback) => {
if (Buffer.isBuffer(res)) {
callback(null, new Block(res))
} else {
res.pipe(bl((err, data) => {
streamToValue(res, (err, data) => {
if (err) {
return callback(err)
}
callback(null, new Block(data))
}))
})
}
})
}

const request = {
path: 'block/get',
args: args,
qs: opts
}

send.andTransform(request, transform, callback)
}),
stat: promisify((args, opts, callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
Expand All @@ -51,19 +53,22 @@ module.exports = (send) => {
callback = opts
opts = {}
}
return send({

const request = {
path: 'block/stat',
args: args,
qs: opts
}, (err, stats) => {
if (err) {
return callback(err)
}
}

// Transform the response from { Key, Size } objects to { key, size } objects
const transform = (stats, callback) => {
callback(null, {
key: stats.Key,
size: stats.Size
})
})
}

send.andTransform(request, transform, callback)
}),
put: promisify((block, cid, callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
Expand All @@ -81,15 +86,15 @@ module.exports = (send) => {
block = block.data
}

return send({
const request = {
path: 'block/put',
files: block
}, (err, blockInfo) => {
if (err) {
return callback(err)
}
callback(null, new Block(block))
})
}

// Transform the response to a Block
const transform = (blockInfo, callback) => callback(null, new Block(block))

send.andTransform(request, transform, callback)
})
}
}
7 changes: 5 additions & 2 deletions src/api/dht.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return {
Expand All @@ -19,11 +20,13 @@ module.exports = (send) => {
opts = {}
}

send({
const request = {
path: 'dht/findprovs',
args: args,
qs: opts
}, callback)
}

send.andTransform(request, streamToValue, callback)
}),
get: promisify((key, opts, callback) => {
if (typeof opts === 'function' &&
Expand Down
15 changes: 8 additions & 7 deletions src/api/get.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict'

const tarStreamToObjects = require('../tar-stream-to-objects')
const cleanMultihash = require('../clean-multihash')
const promisify = require('promisify-es6')
const cleanMultihash = require('../clean-multihash')
const TarStreamToObjects = require('../tar-stream-to-objects')

module.exports = (send) => {
return promisify(function get (path, opts, callback) {
return promisify((path, opts, callback) => {
if (typeof opts === 'function' &&
!callback) {
callback = opts
Expand All @@ -26,12 +26,13 @@ module.exports = (send) => {
return callback(err)
}

var sendWithTransform = send.withTransform(tarStreamToObjects)

sendWithTransform({
const request = {
path: 'get',
args: path,
qs: opts
}, callback)
}

// Convert the response stream to TarStream objects
send.andTransform(request, TarStreamToObjects, callback)
})
}
4 changes: 3 additions & 1 deletion src/api/log.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

const pump = require('pump')
const ndjson = require('ndjson')
const promisify = require('promisify-es6')

Expand All @@ -12,7 +13,8 @@ module.exports = (send) => {
if (err) {
return callback(err)
}
callback(null, response.pipe(ndjson.parse()))
const outputStream = pump(response, ndjson.parse())
callback(null, outputStream)
})
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/object.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const DAGNode = dagPB.DAGNode
const DAGLink = dagPB.DAGLink
const promisify = require('promisify-es6')
const bs58 = require('bs58')
const bl = require('bl')
const streamToValue = require('../stream-to-value')
const cleanMultihash = require('../clean-multihash')
const LRU = require('lru-cache')
const lruOptions = {
Expand Down Expand Up @@ -188,7 +188,7 @@ module.exports = (send) => {
}

if (typeof result.pipe === 'function') {
result.pipe(bl(callback))
streamToValue(result, callback)
} else {
callback(null, result)
}
Expand Down
32 changes: 25 additions & 7 deletions src/api/ping.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return promisify((id, callback) => {
send({
const request = {
path: 'ping',
args: id,
qs: { n: 1 }
}, function (err, res) {
if (err) {
return callback(err, null)
}
callback(null, res[1])
})
}

// Transform the response stream to a value:
// { Success: <boolean>, Time: <number>, Text: <string> }
const transform = (res, callback) => {
streamToValue(res, (err, res) => {
if (err) {
return callback(err)
}

// go-ipfs http api currently returns 3 lines for a ping.
// they're a little messed, so take the correct values from each lines.
const pingResult = {
Success: res[1].Success,
Time: res[1].Time,
Text: res[2].Text
}

callback(null, pingResult)
})
}

send.andTransform(request, transform, callback)
})
}
16 changes: 12 additions & 4 deletions src/api/refs.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
const refs = promisify((args, opts, callback) => {
if (typeof (opts) === 'function') {
callback = opts
opts = {}
}
return send({

const request = {
path: 'refs',
args: args,
qs: opts
}, callback)
}

send.andTransform(request, streamToValue, callback)
})

refs.local = promisify((opts, callback) => {
if (typeof (opts) === 'function') {
callback = opts
opts = {}
}
return send({

const request = {
path: 'refs',
qs: opts
}, callback)
}

send.andTransform(request, streamToValue, callback)
})

return refs
Expand Down
12 changes: 7 additions & 5 deletions src/api/util/fs-add.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict'

const isNode = require('detect-node')
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
const promisify = require('promisify-es6')
const DAGNodeStream = require('../../dagnode-stream')

module.exports = (send) => {
return promisify((path, opts, callback) => {
Expand All @@ -28,12 +28,14 @@ module.exports = (send) => {
return callback(new Error('"path" must be a string'))
}

const sendWithTransform = send.withTransform(addToDagNodesTransform)

sendWithTransform({
const request = {
path: 'add',
qs: opts,
files: path
}, callback)
}

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
send.andTransform(request, transform, callback)
})
}
12 changes: 7 additions & 5 deletions src/api/util/url-add.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
const promisify = require('promisify-es6')
const once = require('once')
const parseUrl = require('url').parse

const request = require('../../request')
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
const DAGNodeStream = require('../../dagnode-stream')

module.exports = (send) => {
return promisify((url, opts, callback) => {
Expand All @@ -28,7 +27,6 @@ module.exports = (send) => {
return callback(new Error('"url" param must be an http(s) url'))
}

const sendWithTransform = send.withTransform(addToDagNodesTransform)
callback = once(callback)

request(parseUrl(url).protocol)(url, (res) => {
Expand All @@ -37,11 +35,15 @@ module.exports = (send) => {
return callback(new Error(`Failed to download with ${res.statusCode}`))
}

sendWithTransform({
const params = {
path: 'add',
qs: opts,
files: res
}, callback)
}

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
send.andTransform(params, transform, callback)
}).end()
})
}
Loading