Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 753d19d

Browse files
haadcodedaviddias
authored andcommitted
refactor(streams): Refactor response stream handling. (#465)
1 parent e4b544d commit 753d19d

22 files changed

+308
-164
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
},
2424
"dependencies": {
2525
"async": "^2.1.4",
26-
"bl": "^1.1.2",
2726
"bs58": "^4.0.0",
2827
"concat-stream": "^1.6.0",
2928
"detect-node": "^2.0.3",
@@ -43,6 +42,7 @@
4342
"peer-id": "^0.8.1",
4443
"peer-info": "^0.8.1",
4544
"promisify-es6": "^1.0.2",
45+
"pump": "^1.0.2",
4646
"qs": "^6.3.0",
4747
"readable-stream": "1.1.14",
4848
"stream-http": "^2.5.0",

src/api/add.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
'use strict'
22

33
const isStream = require('isstream')
4-
const addToDagNodesTransform = require('../add-to-dagnode-transform')
54
const promisify = require('promisify-es6')
5+
const DAGNodeStream = require('../dagnode-stream')
66

77
module.exports = (send) => {
88
return promisify((files, callback) => {
9-
const good = Buffer.isBuffer(files) ||
9+
const ok = Buffer.isBuffer(files) ||
1010
isStream.isReadable(files) ||
1111
Array.isArray(files)
1212

13-
if (!good) {
14-
callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
13+
if (!ok) {
14+
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
1515
}
1616

17-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
18-
19-
return sendWithTransform({
17+
const request = {
2018
path: 'add',
2119
files: files
22-
}, callback)
20+
}
21+
22+
// Transform the response stream to DAGNode values
23+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
24+
send.andTransform(request, transform, callback)
2325
})
2426
}

src/api/block.js

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4-
const bl = require('bl')
54
const Block = require('ipfs-block')
65
const multihash = require('multihashes')
76
const CID = require('cids')
7+
const streamToValue = require('../stream-to-value')
88

99
module.exports = (send) => {
1010
return {
@@ -21,25 +21,27 @@ module.exports = (send) => {
2121
opts = {}
2222
}
2323

24-
return send({
25-
path: 'block/get',
26-
args: args,
27-
qs: opts
28-
}, (err, res) => {
29-
if (err) {
30-
return callback(err)
31-
}
24+
// Transform the response from Buffer or a Stream to a Block
25+
const transform = (res, callback) => {
3226
if (Buffer.isBuffer(res)) {
3327
callback(null, new Block(res))
3428
} else {
35-
res.pipe(bl((err, data) => {
29+
streamToValue(res, (err, data) => {
3630
if (err) {
3731
return callback(err)
3832
}
3933
callback(null, new Block(data))
40-
}))
34+
})
4135
}
42-
})
36+
}
37+
38+
const request = {
39+
path: 'block/get',
40+
args: args,
41+
qs: opts
42+
}
43+
44+
send.andTransform(request, transform, callback)
4345
}),
4446
stat: promisify((args, opts, callback) => {
4547
// TODO this needs to be adjusted with the new go-ipfs http-api
@@ -51,19 +53,22 @@ module.exports = (send) => {
5153
callback = opts
5254
opts = {}
5355
}
54-
return send({
56+
57+
const request = {
5558
path: 'block/stat',
5659
args: args,
5760
qs: opts
58-
}, (err, stats) => {
59-
if (err) {
60-
return callback(err)
61-
}
61+
}
62+
63+
// Transform the response from { Key, Size } objects to { key, size } objects
64+
const transform = (stats, callback) => {
6265
callback(null, {
6366
key: stats.Key,
6467
size: stats.Size
6568
})
66-
})
69+
}
70+
71+
send.andTransform(request, transform, callback)
6772
}),
6873
put: promisify((block, cid, callback) => {
6974
// TODO this needs to be adjusted with the new go-ipfs http-api
@@ -81,15 +86,15 @@ module.exports = (send) => {
8186
block = block.data
8287
}
8388

84-
return send({
89+
const request = {
8590
path: 'block/put',
8691
files: block
87-
}, (err, blockInfo) => {
88-
if (err) {
89-
return callback(err)
90-
}
91-
callback(null, new Block(block))
92-
})
92+
}
93+
94+
// Transform the response to a Block
95+
const transform = (blockInfo, callback) => callback(null, new Block(block))
96+
97+
send.andTransform(request, transform, callback)
9398
})
9499
}
95100
}

src/api/dht.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
return {
@@ -19,11 +20,13 @@ module.exports = (send) => {
1920
opts = {}
2021
}
2122

22-
send({
23+
const request = {
2324
path: 'dht/findprovs',
2425
args: args,
2526
qs: opts
26-
}, callback)
27+
}
28+
29+
send.andTransform(request, streamToValue, callback)
2730
}),
2831
get: promisify((key, opts, callback) => {
2932
if (typeof opts === 'function' &&

src/api/get.js

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
'use strict'
22

3-
const tarStreamToObjects = require('../tar-stream-to-objects')
4-
const cleanMultihash = require('../clean-multihash')
53
const promisify = require('promisify-es6')
4+
const cleanMultihash = require('../clean-multihash')
5+
const TarStreamToObjects = require('../tar-stream-to-objects')
66

77
module.exports = (send) => {
8-
return promisify(function get (path, opts, callback) {
8+
return promisify((path, opts, callback) => {
99
if (typeof opts === 'function' &&
1010
!callback) {
1111
callback = opts
@@ -26,12 +26,13 @@ module.exports = (send) => {
2626
return callback(err)
2727
}
2828

29-
var sendWithTransform = send.withTransform(tarStreamToObjects)
30-
31-
sendWithTransform({
29+
const request = {
3230
path: 'get',
3331
args: path,
3432
qs: opts
35-
}, callback)
33+
}
34+
35+
// Convert the response stream to TarStream objects
36+
send.andTransform(request, TarStreamToObjects, callback)
3637
})
3738
}

src/api/log.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict'
22

3+
const pump = require('pump')
34
const ndjson = require('ndjson')
45
const promisify = require('promisify-es6')
56

@@ -12,7 +13,8 @@ module.exports = (send) => {
1213
if (err) {
1314
return callback(err)
1415
}
15-
callback(null, response.pipe(ndjson.parse()))
16+
const outputStream = pump(response, ndjson.parse())
17+
callback(null, outputStream)
1618
})
1719
})
1820
}

src/api/object.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const DAGNode = dagPB.DAGNode
55
const DAGLink = dagPB.DAGLink
66
const promisify = require('promisify-es6')
77
const bs58 = require('bs58')
8-
const bl = require('bl')
8+
const streamToValue = require('../stream-to-value')
99
const cleanMultihash = require('../clean-multihash')
1010
const LRU = require('lru-cache')
1111
const lruOptions = {
@@ -188,7 +188,7 @@ module.exports = (send) => {
188188
}
189189

190190
if (typeof result.pipe === 'function') {
191-
result.pipe(bl(callback))
191+
streamToValue(result, callback)
192192
} else {
193193
callback(null, result)
194194
}

src/api/ping.js

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,36 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
return promisify((id, callback) => {
7-
send({
8+
const request = {
89
path: 'ping',
910
args: id,
1011
qs: { n: 1 }
11-
}, function (err, res) {
12-
if (err) {
13-
return callback(err, null)
14-
}
15-
callback(null, res[1])
16-
})
12+
}
13+
14+
// Transform the response stream to a value:
15+
// { Success: <boolean>, Time: <number>, Text: <string> }
16+
const transform = (res, callback) => {
17+
streamToValue(res, (err, res) => {
18+
if (err) {
19+
return callback(err)
20+
}
21+
22+
// go-ipfs http api currently returns 3 lines for a ping.
23+
// they're a little messed, so take the correct values from each lines.
24+
const pingResult = {
25+
Success: res[1].Success,
26+
Time: res[1].Time,
27+
Text: res[2].Text
28+
}
29+
30+
callback(null, pingResult)
31+
})
32+
}
33+
34+
send.andTransform(request, transform, callback)
1735
})
1836
}

src/api/refs.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,36 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
const refs = promisify((args, opts, callback) => {
78
if (typeof (opts) === 'function') {
89
callback = opts
910
opts = {}
1011
}
11-
return send({
12+
13+
const request = {
1214
path: 'refs',
1315
args: args,
1416
qs: opts
15-
}, callback)
17+
}
18+
19+
send.andTransform(request, streamToValue, callback)
1620
})
21+
1722
refs.local = promisify((opts, callback) => {
1823
if (typeof (opts) === 'function') {
1924
callback = opts
2025
opts = {}
2126
}
22-
return send({
27+
28+
const request = {
2329
path: 'refs',
2430
qs: opts
25-
}, callback)
31+
}
32+
33+
send.andTransform(request, streamToValue, callback)
2634
})
2735

2836
return refs

src/api/util/fs-add.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
'use strict'
22

33
const isNode = require('detect-node')
4-
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
54
const promisify = require('promisify-es6')
5+
const DAGNodeStream = require('../../dagnode-stream')
66

77
module.exports = (send) => {
88
return promisify((path, opts, callback) => {
@@ -28,12 +28,14 @@ module.exports = (send) => {
2828
return callback(new Error('"path" must be a string'))
2929
}
3030

31-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
32-
33-
sendWithTransform({
31+
const request = {
3432
path: 'add',
3533
qs: opts,
3634
files: path
37-
}, callback)
35+
}
36+
37+
// Transform the response stream to DAGNode values
38+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
39+
send.andTransform(request, transform, callback)
3840
})
3941
}

src/api/util/url-add.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
const promisify = require('promisify-es6')
44
const once = require('once')
55
const parseUrl = require('url').parse
6-
76
const request = require('../../request')
8-
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
7+
const DAGNodeStream = require('../../dagnode-stream')
98

109
module.exports = (send) => {
1110
return promisify((url, opts, callback) => {
@@ -28,7 +27,6 @@ module.exports = (send) => {
2827
return callback(new Error('"url" param must be an http(s) url'))
2928
}
3029

31-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
3230
callback = once(callback)
3331

3432
request(parseUrl(url).protocol)(url, (res) => {
@@ -37,11 +35,15 @@ module.exports = (send) => {
3735
return callback(new Error(`Failed to download with ${res.statusCode}`))
3836
}
3937

40-
sendWithTransform({
38+
const params = {
4139
path: 'add',
4240
qs: opts,
4341
files: res
44-
}, callback)
42+
}
43+
44+
// Transform the response stream to DAGNode values
45+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
46+
send.andTransform(params, transform, callback)
4547
}).end()
4648
})
4749
}

0 commit comments

Comments
 (0)