Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 6d61c8d

Browse files
committed
refactor(streams): Refactor response stream handling.
Add dagnode-stream to transform file results to DAGNode objects. Add stream-to-value to convert a response stream to a single value. Refactor request-api so that chunked JSON objects are not buffered anymore. This touches a bunch of the files pushing the transform function down to the individual commands.
1 parent 5ab51e8 commit 6d61c8d

18 files changed

+269
-149
lines changed

src/api/add.js

+10-8
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
'use strict'
22

33
const isStream = require('isstream')
4-
const addToDagNodesTransform = require('../add-to-dagnode-transform')
54
const promisify = require('promisify-es6')
5+
const DAGNodeStream = require('../dagnode-stream')
66

77
module.exports = (send) => {
88
return promisify((files, callback) => {
9-
const good = Buffer.isBuffer(files) ||
9+
const ok = Buffer.isBuffer(files) ||
1010
isStream.isReadable(files) ||
1111
Array.isArray(files)
1212

13-
if (!good) {
14-
callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
13+
if (!ok) {
14+
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
1515
}
1616

17-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
18-
19-
return sendWithTransform({
17+
const request = {
2018
path: 'add',
2119
files: files
22-
}, callback)
20+
}
21+
22+
// Transform the response stream to DAGNode values
23+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
24+
send.andTransform(request, transform, callback)
2325
})
2426
}

src/api/dht.js

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
return {
@@ -19,11 +20,13 @@ module.exports = (send) => {
1920
opts = {}
2021
}
2122

22-
send({
23+
const request = {
2324
path: 'dht/findprovs',
2425
args: args,
2526
qs: opts
26-
}, callback)
27+
}
28+
29+
send.andTransform(request, streamToValue, callback)
2730
}),
2831
get: promisify((key, opts, callback) => {
2932
if (typeof opts === 'function' &&

src/api/get.js

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
'use strict'
22

3-
const tarStreamToObjects = require('../tar-stream-to-objects')
4-
const cleanMultihash = require('../clean-multihash')
53
const promisify = require('promisify-es6')
4+
const cleanMultihash = require('../clean-multihash')
5+
const TarStreamToObjects = require('../tar-stream-to-objects')
66

77
module.exports = (send) => {
8-
return promisify(function get (path, opts, callback) {
8+
return promisify((path, opts, callback) => {
99
if (typeof opts === 'function' &&
1010
!callback) {
1111
callback = opts
@@ -26,12 +26,13 @@ module.exports = (send) => {
2626
return callback(err)
2727
}
2828

29-
var sendWithTransform = send.withTransform(tarStreamToObjects)
30-
31-
sendWithTransform({
29+
const request = {
3230
path: 'get',
3331
args: path,
3432
qs: opts
35-
}, callback)
33+
}
34+
35+
// Convert the response stream to TarStream objects
36+
send.andTransform(request, TarStreamToObjects.from, callback)
3637
})
3738
}

src/api/ping.js

+25-7
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,36 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
return promisify((id, callback) => {
7-
send({
8+
const request = {
89
path: 'ping',
910
args: id,
1011
qs: { n: 1 }
11-
}, function (err, res) {
12-
if (err) {
13-
return callback(err, null)
14-
}
15-
callback(null, res[1])
16-
})
12+
}
13+
14+
// Transform the response stream to a value:
15+
// { Success: <boolean>, Time: <number>, Text: <string> }
16+
const transform = (res, callback) => {
17+
streamToValue(res, (err, res) => {
18+
if (err) {
19+
return callback(err)
20+
}
21+
22+
// go-ipfs http api currently returns 3 lines for a ping.
23+
// they're a little messed, so take the correct values from each lines.
24+
const pingResult = {
25+
Success: res[1].Success,
26+
Time: res[1].Time,
27+
Text: res[2].Text
28+
}
29+
30+
callback(null, pingResult)
31+
})
32+
}
33+
34+
send.andTransform(request, transform, callback)
1735
})
1836
}

src/api/refs.js

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,36 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
const refs = promisify((args, opts, callback) => {
78
if (typeof (opts) === 'function') {
89
callback = opts
910
opts = {}
1011
}
11-
return send({
12+
13+
const request = {
1214
path: 'refs',
1315
args: args,
1416
qs: opts
15-
}, callback)
17+
}
18+
19+
send.andTransform(request, streamToValue, callback)
1620
})
21+
1722
refs.local = promisify((opts, callback) => {
1823
if (typeof (opts) === 'function') {
1924
callback = opts
2025
opts = {}
2126
}
22-
return send({
27+
28+
const request = {
2329
path: 'refs',
2430
qs: opts
25-
}, callback)
31+
}
32+
33+
send.andTransform(request, streamToValue, callback)
2634
})
2735

2836
return refs

src/api/util/fs-add.js

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
'use strict'
22

33
const isNode = require('detect-node')
4-
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
54
const promisify = require('promisify-es6')
5+
const DAGNodeStream = require('../../dagnode-stream')
66

77
module.exports = (send) => {
88
return promisify((path, opts, callback) => {
@@ -28,12 +28,14 @@ module.exports = (send) => {
2828
return callback(new Error('"path" must be a string'))
2929
}
3030

31-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
32-
33-
sendWithTransform({
31+
const request = {
3432
path: 'add',
3533
qs: opts,
3634
files: path
37-
}, callback)
35+
}
36+
37+
// Transform the response stream to DAGNode values
38+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
39+
send.andTransform(request, transform, callback)
3840
})
3941
}

src/api/util/url-add.js

+7-5
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
const promisify = require('promisify-es6')
44
const once = require('once')
55
const parseUrl = require('url').parse
6-
76
const request = require('../../request')
8-
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
7+
const DAGNodeStream = require('../../dagnode-stream')
98

109
module.exports = (send) => {
1110
return promisify((url, opts, callback) => {
@@ -28,7 +27,6 @@ module.exports = (send) => {
2827
return callback(new Error('"url" param must be an http(s) url'))
2928
}
3029

31-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
3230
callback = once(callback)
3331

3432
request(parseUrl(url).protocol)(url, (res) => {
@@ -37,11 +35,15 @@ module.exports = (send) => {
3735
return callback(new Error(`Failed to download with ${res.statusCode}`))
3836
}
3937

40-
sendWithTransform({
38+
const params = {
4139
path: 'add',
4240
qs: opts,
4341
files: res
44-
}, callback)
42+
}
43+
44+
// Transform the response stream to DAGNode values
45+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
46+
send.andTransform(params, transform, callback)
4547
}).end()
4648
})
4749
}

src/dagnode-stream.js

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
'use strict'
2+
3+
const TransformStream = require('readable-stream').Transform
4+
const streamToValue = require('./stream-to-value')
5+
const getDagNode = require('./get-dagnode')
6+
7+
/*
8+
Transforms a stream of objects to DAGNodes and outputs them as objects.
9+
10+
Usage: inputStream.pipe(DAGNodeStream({ send: send }))
11+
12+
Input object format:
13+
{
14+
Name: '/path/to/file/foo.txt',
15+
Hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP'
16+
}
17+
18+
Output object format:
19+
{
20+
path: '/path/to/file/foo.txt',
21+
hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP',
22+
size: 20
23+
}
24+
*/
25+
class DAGNodeStream extends TransformStream {
26+
constructor (options) {
27+
const opts = Object.assign(options || {}, { objectMode: true })
28+
super(opts)
29+
this._send = opts.send
30+
}
31+
32+
static streamToValue (send, inputStream, callback) {
33+
const outputStream = inputStream.pipe(new DAGNodeStream({ send: send }))
34+
streamToValue(outputStream, callback)
35+
}
36+
37+
_transform (obj, enc, callback) {
38+
getDagNode(this._send, obj.Hash, (err, node) => {
39+
if (err) {
40+
return callback(err)
41+
}
42+
43+
const dag = {
44+
path: obj.Name,
45+
hash: obj.Hash,
46+
size: node.size
47+
}
48+
49+
this.push(dag)
50+
callback(null)
51+
})
52+
}
53+
}
54+
55+
module.exports = DAGNodeStream

src/get-dagnode.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
'use strict'
22

33
const DAGNode = require('ipld-dag-pb').DAGNode
4-
const bl = require('bl')
54
const parallel = require('async/parallel')
5+
const streamToValue = require('./stream-to-value')
66

77
module.exports = function (send, hash, callback) {
88
// Retrieve the object and its data in parallel, then produce a DAGNode
@@ -36,12 +36,12 @@ module.exports = function (send, hash, callback) {
3636
if (Buffer.isBuffer(stream)) {
3737
DAGNode.create(stream, object.Links, callback)
3838
} else {
39-
stream.pipe(bl(function (err, data) {
39+
streamToValue(stream, (err, data) => {
4040
if (err) {
4141
return callback(err)
4242
}
4343
DAGNode.create(data, object.Links, callback)
44-
}))
44+
})
4545
}
4646
})
4747
}

0 commit comments

Comments
 (0)