Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 494235d

Browse files
committed
chore: converts remaining file api methods to async iterators
1 parent 88d2448 commit 494235d

25 files changed

+410
-455
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"array-shuffle": "^1.0.1",
6868
"async": "^2.6.1",
6969
"async-iterator-all": "^1.0.0",
70+
"async-iterator-first": "^1.0.0",
7071
"async-iterator-to-pull-stream": "^1.3.0",
7172
"async-iterator-to-stream": "^1.1.0",
7273
"base32.js": "~0.1.0",
@@ -99,7 +100,7 @@
99100
"ipfs-bitswap": "^0.26.0",
100101
"ipfs-block": "~0.8.1",
101102
"ipfs-block-service": "~0.16.0",
102-
"ipfs-http-client": "^38.2.0",
103+
"ipfs-http-client": "ipfs/js-ipfs-http-client#add-async-iterator-methods",
103104
"ipfs-http-response": "~0.3.1",
104105
"ipfs-mfs": "^0.13.0",
105106
"ipfs-multipart": "^0.2.0",

src/cli/commands/cat.js

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,9 @@ module.exports = {
2222
resolve((async () => {
2323
const ipfs = await getIpfs()
2424

25-
return new Promise((resolve, reject) => {
26-
const stream = ipfs.catReadableStream(ipfsPath, { offset, length })
27-
28-
stream.on('error', reject)
29-
stream.on('end', resolve)
30-
31-
stream.pipe(process.stdout)
32-
})
25+
for await (const buf of ipfs._catAsyncIterator(ipfsPath, { offset, length })) {
26+
process.stdout.write(buf)
27+
}
3328
})())
3429
}
3530
}

src/core/components/files-regular/add.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const all = require('async-iterator-all')
55
module.exports = function (self) {
66
// can't use callbackify because if `data` is a pull stream
77
// it thinks we are passing a callback. This is why we can't have nice things.
8-
return (data, options, callback) => {
8+
return function add (data, options, callback) {
99
if (!callback && typeof options === 'function') {
1010
callback = options
1111
options = {}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'use strict'
2+
3+
const exporter = require('ipfs-unixfs-exporter')
4+
const { normalizePath } = require('./utils')
5+
6+
module.exports = function (self) {
7+
return async function * catAsyncIterator (ipfsPath, options) {
8+
options = options || {}
9+
10+
ipfsPath = normalizePath(ipfsPath)
11+
12+
if (options.preload !== false) {
13+
const pathComponents = ipfsPath.split('/')
14+
self._preload(pathComponents[0])
15+
}
16+
17+
const file = await exporter(ipfsPath, self._ipld, options)
18+
19+
// File may not have unixfs prop if small & imported with rawLeaves true
20+
if (file.unixfs && file.unixfs.type.includes('dir')) {
21+
throw new Error('this dag node is a directory')
22+
}
23+
24+
if (!file.content) {
25+
throw new Error('this dag node has no content')
26+
}
27+
28+
yield * file.content(options)
29+
}
30+
}
Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,9 @@
11
'use strict'
22

3-
const exporter = require('ipfs-unixfs-exporter')
4-
const deferred = require('pull-defer')
53
const toPullStream = require('async-iterator-to-pull-stream')
6-
const { normalizePath } = require('./utils')
74

85
module.exports = function (self) {
96
return function catPullStream (ipfsPath, options) {
10-
if (typeof ipfsPath === 'function') {
11-
throw new Error('You must supply an ipfsPath')
12-
}
13-
14-
options = options || {}
15-
16-
ipfsPath = normalizePath(ipfsPath)
17-
const pathComponents = ipfsPath.split('/')
18-
19-
if (options.preload !== false) {
20-
self._preload(pathComponents[0])
21-
}
22-
23-
const d = deferred.source()
24-
25-
exporter(ipfsPath, self._ipld, options)
26-
.then(file => {
27-
// File may not have unixfs prop if small & imported with rawLeaves true
28-
if (file.unixfs && file.unixfs.type.includes('dir')) {
29-
return d.abort(new Error('this dag node is a directory'))
30-
}
31-
32-
if (!file.content) {
33-
return d.abort(new Error('this dag node has no content'))
34-
}
35-
36-
d.resolve(toPullStream.source(file.content(options)))
37-
}, err => {
38-
d.abort(err)
39-
})
40-
41-
return d
7+
return toPullStream.source(self._catAsyncIterator(ipfsPath, options))
428
}
439
}
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
'use strict'
22

3-
const toStream = require('pull-stream-to-stream')
3+
const toStream = require('it-to-stream')
44

55
module.exports = function (self) {
6-
return (ipfsPath, options) => toStream.source(self.catPullStream(ipfsPath, options))
6+
return function catReadableStream (ipfsPath, options) {
7+
return toStream.readable(self._catAsyncIterator(ipfsPath, options), {
8+
objectMode: true
9+
})
10+
}
711
}
Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,10 @@
11
'use strict'
22

3-
const promisify = require('promisify-es6')
4-
const pull = require('pull-stream')
3+
const callbackify = require('callbackify')
4+
const all = require('async-iterator-all')
55

66
module.exports = function (self) {
7-
return promisify((ipfsPath, options, callback) => {
8-
if (typeof options === 'function') {
9-
callback = options
10-
options = {}
11-
}
12-
13-
pull(
14-
self.catPullStream(ipfsPath, options),
15-
pull.collect((err, buffers) => {
16-
if (err) { return callback(err) }
17-
callback(null, Buffer.concat(buffers))
18-
})
19-
)
7+
return callbackify.variadic(async function cat (ipfsPath, options) {
8+
return Buffer.concat(await all(self._catAsyncIterator(ipfsPath, options)))
209
})
2110
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'use strict'
2+
3+
const exporter = require('ipfs-unixfs-exporter')
4+
const errCode = require('err-code')
5+
const { normalizePath, mapFile } = require('./utils')
6+
7+
module.exports = function (self) {
8+
return async function * getAsyncIterator (ipfsPath, options) {
9+
options = options || {}
10+
11+
if (options.preload !== false) {
12+
let pathComponents
13+
14+
try {
15+
pathComponents = normalizePath(ipfsPath).split('/')
16+
} catch (err) {
17+
throw errCode(err, 'ERR_INVALID_PATH')
18+
}
19+
20+
self._preload(pathComponents[0])
21+
}
22+
23+
for await (const file of exporter.recursive(ipfsPath, self._ipld, options)) {
24+
yield mapFile(file, {
25+
...options,
26+
includeContent: true
27+
})
28+
}
29+
}
30+
}
Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,20 @@
11
'use strict'
22

3-
const exporter = require('ipfs-unixfs-exporter')
43
const toPullStream = require('async-iterator-to-pull-stream')
5-
const errCode = require('err-code')
64
const pull = require('pull-stream/pull')
7-
const pullError = require('pull-stream/sources/error')
85
const map = require('pull-stream/throughs/map')
9-
const { normalizePath, mapFile } = require('./utils')
106

117
module.exports = function (self) {
12-
return (ipfsPath, options) => {
13-
options = options || {}
14-
15-
if (options.preload !== false) {
16-
let pathComponents
17-
18-
try {
19-
pathComponents = normalizePath(ipfsPath).split('/')
20-
} catch (err) {
21-
return pullError(errCode(err, 'ERR_INVALID_PATH'))
22-
}
23-
24-
self._preload(pathComponents[0])
25-
}
26-
8+
return function getPullStream (ipfsPath, options) {
279
return pull(
28-
toPullStream.source(exporter.recursive(ipfsPath, self._ipld, options)),
29-
map(mapFile({
30-
...options,
31-
includeContent: true
32-
}))
10+
toPullStream.source(self._getAsyncIterator(ipfsPath, options)),
11+
map(file => {
12+
if (file.content) {
13+
file.content = toPullStream.source(file.content())
14+
}
15+
16+
return file
17+
})
3318
)
3419
}
3520
}
Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
'use strict'
22

3-
const pull = require('pull-stream')
4-
const toStream = require('pull-stream-to-stream')
3+
const toStream = require('it-to-stream')
54

65
module.exports = function (self) {
7-
return (ipfsPath, options) => {
8-
options = options || {}
6+
return function getReadableStream (ipfsPath, options) {
7+
return toStream.readable((async function * () {
8+
for await (const file of self._getAsyncIterator(ipfsPath, options)) {
9+
if (file.content) {
10+
file.content = toStream.readable(file.content())
11+
}
912

10-
return toStream.source(
11-
pull(
12-
self.getPullStream(ipfsPath, options),
13-
pull.map((file) => {
14-
if (file.content) {
15-
file.content = toStream.source(file.content)
16-
file.content.pause()
17-
}
18-
19-
return file
20-
})
21-
)
22-
)
13+
yield file
14+
}
15+
}()), {
16+
objectMode: true
17+
})
2318
}
2419
}
Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,18 @@
11
'use strict'
22

3-
const promisify = require('promisify-es6')
4-
const pull = require('pull-stream')
3+
const callbackify = require('callbackify')
4+
const all = require('async-iterator-all')
55

66
module.exports = function (self) {
7-
return promisify((ipfsPath, options, callback) => {
8-
if (typeof options === 'function') {
9-
callback = options
10-
options = {}
11-
}
12-
13-
options = options || {}
14-
15-
pull(
16-
self.getPullStream(ipfsPath, options),
17-
pull.asyncMap((file, cb) => {
7+
return callbackify.variadic(async function get (ipfsPath, options) { // eslint-disable-line require-await
8+
return all(async function * () {
9+
for await (const file of self._getAsyncIterator(ipfsPath, options)) {
1810
if (file.content) {
19-
pull(
20-
file.content,
21-
pull.collect((err, buffers) => {
22-
if (err) { return cb(err) }
23-
file.content = Buffer.concat(buffers)
24-
cb(null, file)
25-
})
26-
)
27-
} else {
28-
cb(null, file)
11+
file.content = Buffer.concat(await all(file.content()))
2912
}
30-
}),
31-
pull.collect(callback)
32-
)
13+
14+
yield file
15+
}
16+
}())
3317
})
3418
}

src/core/components/files-regular/index.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,23 @@ module.exports = (self) => {
1212
cat: require('./cat')(self),
1313
catPullStream: require('./cat-pull-stream')(self),
1414
catReadableStream: require('./cat-readable-stream')(self),
15+
_catAsyncIterator: require('./cat-async-iterator')(self),
1516
get: require('./get')(self),
1617
getPullStream: require('./get-pull-stream')(self),
1718
getReadableStream: require('./get-readable-stream')(self),
19+
_getAsyncIterator: require('./get-async-iterator')(self),
1820
ls: require('./ls')(self),
1921
lsPullStream: require('./ls-pull-stream')(self),
2022
lsReadableStream: require('./ls-readable-stream')(self),
23+
_lsAsyncIterator: require('./ls-async-iterator')(self),
2124
refs: require('./refs')(self),
2225
refsReadableStream: require('./refs-readable-stream')(self),
23-
refsPullStream: require('./refs-pull-stream')(self)
26+
refsPullStream: require('./refs-pull-stream')(self),
27+
_refsAsyncIterator: require('./refs-async-iterator')(self)
2428
}
2529
filesRegular.refs.local = require('./refs-local')(self)
2630
filesRegular.refs.localReadableStream = require('./refs-local-readable-stream')(self)
2731
filesRegular.refs.localPullStream = require('./refs-local-pull-stream')(self)
32+
filesRegular.refs._localAsyncIterator = require('./refs-local-async-iterator')(self)
2833
return filesRegular
2934
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict'
2+
3+
const exporter = require('ipfs-unixfs-exporter')
4+
const errCode = require('err-code')
5+
const { normalizePath, mapFile } = require('./utils')
6+
7+
module.exports = function (self) {
8+
return async function * lsAsyncIterator (ipfsPath, options) {
9+
options = options || {}
10+
11+
const path = normalizePath(ipfsPath)
12+
const recursive = options.recursive
13+
const pathComponents = path.split('/')
14+
15+
if (options.preload !== false) {
16+
self._preload(pathComponents[0])
17+
}
18+
19+
const file = await exporter(ipfsPath, self._ipld, options)
20+
21+
if (!file.unixfs) {
22+
throw errCode(new Error('dag node was not a UnixFS node'), 'ENOTUNIXFS')
23+
}
24+
25+
if (file.unixfs.type === 'file') {
26+
return mapFile(file, options)
27+
}
28+
29+
if (file.unixfs.type.includes('dir')) {
30+
if (recursive) {
31+
for await (const child of exporter.recursive(file.cid, self._ipld, options)) {
32+
if (file.cid.toBaseEncodedString() === child.cid.toBaseEncodedString()) {
33+
continue
34+
}
35+
36+
yield mapFile(child, options)
37+
}
38+
39+
return
40+
}
41+
42+
for await (let child of file.content()) {
43+
child = mapFile(child, options)
44+
child.depth--
45+
46+
yield child
47+
}
48+
49+
return
50+
}
51+
52+
throw errCode(new Error(`Unknown UnixFS type ${file.unixfs.type}`), 'EUNKNOWNUNIXFSTYPE')
53+
}
54+
}

0 commit comments

Comments
 (0)