Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit bd31372

Browse files
committed
feat: support adding async iterators
Adds a `ipfs._addAsyncIterator` method for adding async iterators and refactors all add methods to call this, as when the Great Async Iteator Migration is complete this will become the one, true method to add files to IPFS.
1 parent ec6c8ee commit bd31372

File tree

17 files changed

+529
-608
lines changed

17 files changed

+529
-608
lines changed

package.json

+6-3
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@
9898
"ipfs-block-service": "~0.15.2",
9999
"ipfs-http-client": "^34.0.0",
100100
"ipfs-http-response": "~0.3.1",
101-
"ipfs-mfs": "~0.12.0",
102-
"ipfs-multipart": "~0.1.1",
101+
"ipfs-mfs": "ipfs/js-ipfs-mfs#v0.12.x-update-ipfs-multipart",
102+
"ipfs-multipart": "^0.2.0",
103103
"ipfs-repo": "~0.26.6",
104104
"ipfs-unixfs": "~0.1.16",
105105
"ipfs-unixfs-exporter": "~0.37.7",
106106
"ipfs-unixfs-importer": "~0.39.11",
107-
"ipfs-utils": "~0.0.4",
107+
"ipfs-utils": "achingbrain/js-ipfs-utils#support-async-iterators",
108108
"ipld": "~0.24.1",
109109
"ipld-bitcoin": "~0.3.0",
110110
"ipld-dag-cbor": "~0.15.0",
@@ -119,6 +119,9 @@
119119
"is-pull-stream": "~0.0.0",
120120
"is-stream": "^2.0.0",
121121
"iso-url": "~0.4.6",
122+
"it-glob": "^0.0.1",
123+
"it-pipe": "^1.0.1",
124+
"it-to-stream": "^0.1.1",
122125
"just-safe-set": "^2.1.0",
123126
"kind-of": "^6.0.2",
124127
"libp2p": "~0.25.4",

src/cli/commands/add.js

+39-50
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
'use strict'
22

3-
const pull = require('pull-stream/pull')
4-
const through = require('pull-stream/throughs/through')
5-
const end = require('pull-stream/sinks/on-end')
63
const promisify = require('promisify-es6')
74
const getFolderSize = promisify(require('get-folder-size'))
85
const byteman = require('byteman')
96
const mh = require('multihashes')
107
const multibase = require('multibase')
11-
const toPull = require('stream-to-pull-stream')
128
const { createProgressBar } = require('../utils')
139
const { cidToString } = require('../../utils/cid')
1410
const globSource = require('../../utils/files/glob-source')
@@ -18,48 +14,6 @@ async function getTotalBytes (paths) {
1814
return sizes.reduce((total, size) => total + size, 0)
1915
}
2016

21-
function addPipeline (source, addStream, options, log) {
22-
let finalHash
23-
24-
return new Promise((resolve, reject) => {
25-
pull(
26-
source,
27-
addStream,
28-
through((file) => {
29-
const cid = finalHash = cidToString(file.hash, { base: options.cidBase })
30-
31-
if (options.silent || options.quieter) {
32-
return
33-
}
34-
35-
let message = cid
36-
37-
if (!options.quiet) {
38-
// print the hash twice if we are piping from stdin
39-
message = `added ${cid} ${options.file ? file.path || '' : cid}`.trim()
40-
}
41-
42-
log(message)
43-
}),
44-
end((err) => {
45-
if (err) {
46-
// Tweak the error message and add more relevant infor for the CLI
47-
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
48-
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
49-
}
50-
return reject(err)
51-
}
52-
53-
if (options.quieter) {
54-
log(finalHash)
55-
}
56-
57-
resolve()
58-
})
59-
)
60-
})
61-
}
62-
6317
module.exports = {
6418
command: 'add [file...]',
6519

@@ -200,16 +154,51 @@ module.exports = {
200154

201155
const source = argv.file
202156
? globSource(...argv.file, { recursive: argv.recursive })
203-
: toPull.source(process.stdin) // Pipe directly to ipfs.add
157+
: process.stdin // Pipe directly to ipfs.add
204158

205-
const adder = ipfs.addPullStream(options)
159+
let finalHash
206160

207161
try {
208-
await addPipeline(source, adder, argv, log)
209-
} finally {
162+
for await (const file of ipfs._addAsyncIterator(source, options)) {
163+
164+
if (argv.silent) {
165+
continue
166+
}
167+
168+
if (argv.quieter) {
169+
finalHash = file.hash
170+
continue
171+
}
172+
173+
const cid = cidToString(file.hash, { base: argv.cidBase })
174+
let message = cid
175+
176+
if (!argv.quiet) {
177+
// print the hash twice if we are piping from stdin
178+
message = `added ${cid} ${argv.file ? file.path || '' : cid}`.trim()
179+
}
180+
181+
log(message)
182+
}
183+
} catch (err) {
210184
if (bar) {
211185
bar.terminate()
212186
}
187+
188+
// Tweak the error message and add more relevant infor for the CLI
189+
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
190+
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
191+
}
192+
193+
throw err
194+
}
195+
196+
if (bar) {
197+
bar.terminate()
198+
}
199+
200+
if (argv.quieter) {
201+
log(cidToString(finalHash, { base: argv.cidBase }))
213202
}
214203
})())
215204
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
'use strict'
2+
3+
const importer = require('ipfs-unixfs-importer')
4+
const isSource = require('is-pull-stream').isSource
5+
const validateAddInput = require('ipfs-utils/src/files/add-input-validation')
6+
const { parseChunkerString } = require('./utils')
7+
const pipe = require('it-pipe')
8+
const { supportsFileReader } = require('ipfs-utils/src/supports')
9+
const toAsyncIterator = require('pull-stream-to-async-iterator')
10+
const log = require('debug')('ipfs:add')
11+
log.error = require('debug')('ipfs:add:error')
12+
13+
function noop () {}
14+
15+
module.exports = function (self) {
16+
// Internal add func that gets used by all add funcs
17+
return async function * addAsyncIterator (source, options) {
18+
options = options || {}
19+
20+
let chunkerOptions = parseChunkerString(options.chunker)
21+
22+
const opts = Object.assign({}, {
23+
shardSplitThreshold: self._options.EXPERIMENTAL.sharding
24+
? 1000
25+
: Infinity
26+
}, options, {
27+
chunker: chunkerOptions.chunker,
28+
chunkerOptions: chunkerOptions.chunkerOptions
29+
})
30+
31+
// CID v0 is for multihashes encoded with sha2-256
32+
if (opts.hashAlg && opts.cidVersion !== 1) {
33+
opts.cidVersion = 1
34+
}
35+
36+
let total = 0
37+
38+
const prog = opts.progress || noop
39+
const progress = (bytes) => {
40+
total += bytes
41+
prog(total)
42+
}
43+
44+
opts.progress = progress
45+
46+
if (Buffer.isBuffer(source) || typeof source === 'string') {
47+
source = [
48+
source
49+
]
50+
}
51+
52+
const iterator = pipe(
53+
source,
54+
validateInput(),
55+
normalizeInput(opts),
56+
doImport(self, opts),
57+
prepareFile(self, opts),
58+
preloadFile(self, opts),
59+
pinFile(self, opts)
60+
)
61+
62+
const releaseLock = await self._gcLock.readLock()
63+
64+
try {
65+
yield * iterator
66+
} finally {
67+
releaseLock()
68+
}
69+
}
70+
}
71+
72+
function validateInput () {
73+
return async function * (source) {
74+
for await (const data of source) {
75+
validateAddInput(data)
76+
77+
yield data
78+
}
79+
}
80+
}
81+
82+
function normalizeContent (content) {
83+
if (supportsFileReader && kindOf(content) === 'file') {
84+
return streamFromFileReader(content)
85+
}
86+
87+
// pull stream source
88+
if (isSource(content)) {
89+
return toAsyncIterator(content)
90+
}
91+
92+
if (typeof content === 'string') {
93+
return Buffer.from(content)
94+
}
95+
96+
if (Array.isArray(content) && content.length && !Array.isArray(content[0])) {
97+
return [content]
98+
}
99+
100+
return content
101+
}
102+
103+
function normalizeInput (opts) {
104+
return async function * (source) {
105+
for await (let data of source) {
106+
if (data.content) {
107+
data.content = normalizeContent(data.content)
108+
} else {
109+
data = {
110+
path: '',
111+
content: normalizeContent(data)
112+
}
113+
}
114+
115+
if (opts.wrapWithDirectory && !data.path) {
116+
throw new Error('Must provide a path when wrapping with a directory')
117+
}
118+
119+
yield data
120+
}
121+
}
122+
}
123+
124+
function doImport (ipfs, opts) {
125+
return function (source) {
126+
return importer(source, ipfs._ipld, opts)
127+
}
128+
}
129+
130+
function prepareFile (ipfs, opts) {
131+
return async function * (source) {
132+
for await (const file of source) {
133+
let cid = file.cid
134+
const hash = cid.toBaseEncodedString()
135+
let path = file.path ? file.path : hash
136+
137+
if (opts.wrapWithDirectory && !file.path) {
138+
path = ''
139+
}
140+
141+
if (opts.onlyHash) {
142+
yield {
143+
path,
144+
hash,
145+
size: file.unixfs.fileSize()
146+
}
147+
148+
return
149+
}
150+
151+
const node = await ipfs.object.get(file.cid, Object.assign({}, opts, { preload: false }))
152+
153+
if (opts.cidVersion === 1) {
154+
cid = cid.toV1()
155+
}
156+
157+
let size = node.size
158+
159+
if (Buffer.isBuffer(node)) {
160+
size = node.length
161+
}
162+
163+
yield {
164+
path,
165+
hash,
166+
size
167+
}
168+
}
169+
}
170+
}
171+
172+
function preloadFile (ipfs, opts) {
173+
return async function * (source) {
174+
for await (const file of source) {
175+
const isRootFile = !file.path || opts.wrapWithDirectory
176+
? file.path === ''
177+
: !file.path.includes('/')
178+
179+
const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false
180+
181+
if (shouldPreload) {
182+
ipfs._preload(file.hash)
183+
}
184+
185+
yield file
186+
}
187+
}
188+
}
189+
190+
function pinFile (ipfs, opts) {
191+
return async function * (source) {
192+
for await (const file of source) {
193+
// Pin a file if it is the root dir of a recursive add or the single file
194+
// of a direct add.
195+
const pin = 'pin' in opts ? opts.pin : true
196+
const isRootDir = !file.path.includes('/')
197+
const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg
198+
199+
if (shouldPin) {
200+
await ipfs.pin.add(file.hash, { preload: false })
201+
}
202+
203+
yield file
204+
}
205+
}
206+
}

0 commit comments

Comments
 (0)