Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 1d19c4f

Browse files
authored
perf: expose importer concurrency controls when adding files (#2637)
* perf: expose importer concurrency controls when adding files Adds two new arguments to the cli & http interface: `--file-import-concurrency` and `--block-write-concurrency` See ipfs-inactive/js-ipfs-unixfs-importer#41 for futher discussion. * chore: update deps, remove unused * fix: pass args from http * fix: hard code file concurrency for http requests * fix: fix up chunker parsing tests * chore: use ipfs-http-client branch temporarily * chore: increase bundlesize by 1kb * chore: update dep * chore: increase bundle size again
1 parent ed886f4 commit 1d19c4f

21 files changed

+50
-33
lines changed

.aegir.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const preloadNode = MockPreloadNode.createNode()
99
const echoServer = EchoServer.createServer()
1010

1111
module.exports = {
12-
bundlesize: { maxSize: '651kB' },
12+
bundlesize: { maxSize: '652kB' },
1313
webpack: {
1414
resolve: {
1515
mainFields: ['browser', 'main'],

package.json

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@
6666
"@hapi/joi": "^15.0.0",
6767
"abort-controller": "^3.0.0",
6868
"array-shuffle": "^1.0.1",
69-
"async-iterator-all": "^1.0.0",
70-
"async-iterator-first": "^1.0.0",
7169
"async-iterator-to-pull-stream": "^1.3.0",
7270
"async-iterator-to-stream": "^1.1.0",
7371
"base32.js": "~0.1.0",
@@ -100,14 +98,14 @@
10098
"ipfs-bitswap": "^0.26.0",
10199
"ipfs-block": "~0.8.1",
102100
"ipfs-block-service": "~0.16.0",
103-
"ipfs-http-client": "^40.0.1",
101+
"ipfs-http-client": "^40.1.0",
104102
"ipfs-http-response": "~0.4.0",
105103
"ipfs-mfs": "^0.13.2",
106104
"ipfs-multipart": "^0.2.0",
107105
"ipfs-repo": "^0.30.0",
108106
"ipfs-unixfs": "~0.1.16",
109107
"ipfs-unixfs-exporter": "^0.38.0",
110-
"ipfs-unixfs-importer": "^0.40.0",
108+
"ipfs-unixfs-importer": "^0.42.0",
111109
"ipfs-utils": "~0.4.0",
112110
"ipld": "~0.25.0",
113111
"ipld-bitcoin": "~0.3.0",
@@ -123,6 +121,7 @@
123121
"is-pull-stream": "~0.0.0",
124122
"is-stream": "^2.0.0",
125123
"iso-url": "~0.4.6",
124+
"it-all": "^1.0.1",
126125
"it-pipe": "^1.0.1",
127126
"it-to-stream": "^0.1.1",
128127
"jsondiffpatch": "~0.3.11",

src/cli/commands/add.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,20 @@ module.exports = {
4949
default: false,
5050
describe: 'Only chunk and hash, do not write'
5151
},
52+
'block-write-concurrency': {
53+
type: 'integer',
54+
default: 10,
55+
describe: 'After a file has been chunked, this controls how many chunks to hash and add to the block store concurrently'
56+
},
5257
chunker: {
5358
default: 'size-262144',
5459
describe: 'Chunking algorithm to use, formatted like [size-{size}, rabin, rabin-{avg}, rabin-{min}-{avg}-{max}]'
5560
},
61+
'file-import-concurrency': {
62+
type: 'integer',
63+
default: 50,
64+
describe: 'How many files to import at once'
65+
},
5666
'enable-sharding-experiment': {
5767
type: 'boolean',
5868
default: false
@@ -130,7 +140,10 @@ module.exports = {
130140
wrapWithDirectory: argv.wrapWithDirectory,
131141
pin: argv.pin,
132142
chunker: argv.chunker,
133-
preload: argv.preload
143+
preload: argv.preload,
144+
nonatomic: argv.nonatomic,
145+
fileImportConcurrency: argv.fileImportConcurrency,
146+
blockWriteConcurrency: argv.blockWriteConcurrency
134147
}
135148

136149
if (options.enableShardingExperiment && argv.isDaemonOn()) {

src/core/components/block.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const multihashing = require('multihashing-async')
55
const CID = require('cids')
66
const callbackify = require('callbackify')
77
const errCode = require('err-code')
8-
const all = require('async-iterator-all')
8+
const all = require('it-all')
99
const { PinTypes } = require('./pin/pin-manager')
1010

1111
module.exports = function block (self) {

src/core/components/dag.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const callbackify = require('callbackify')
44
const CID = require('cids')
5-
const all = require('async-iterator-all')
5+
const all = require('it-all')
66
const errCode = require('err-code')
77
const multicodec = require('multicodec')
88

src/core/components/files-mfs.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const isPullStream = require('is-pull-stream')
55
const toPullStream = require('async-iterator-to-pull-stream')
66
const toReadableStream = require('async-iterator-to-stream')
77
const pullStreamToAsyncIterator = require('pull-stream-to-async-iterator')
8-
const all = require('async-iterator-all')
8+
const all = require('it-all')
99
const nodeify = require('promise-nodeify')
1010
const PassThrough = require('stream').PassThrough
1111
const pull = require('pull-stream/pull')

src/core/components/files-regular/add-async-iterator.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ module.exports = function (self) {
2222
: Infinity
2323
}, options, {
2424
strategy: 'balanced',
25-
chunker: chunkerOptions.chunker,
26-
chunkerOptions: chunkerOptions.chunkerOptions
25+
...chunkerOptions
2726
})
2827

2928
// CID v0 is for multihashes encoded with sha2-256

src/core/components/files-regular/add.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
const all = require('async-iterator-all')
3+
const all = require('it-all')
44

55
module.exports = function (self) {
66
// can't use callbackify because if `data` is a pull stream

src/core/components/files-regular/cat.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const callbackify = require('callbackify')
4-
const all = require('async-iterator-all')
4+
const all = require('it-all')
55

66
module.exports = function (self) {
77
return callbackify.variadic(async function cat (ipfsPath, options) {

src/core/components/files-regular/get.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const callbackify = require('callbackify')
4-
const all = require('async-iterator-all')
4+
const all = require('it-all')
55

66
module.exports = function (self) {
77
return callbackify.variadic(async function get (ipfsPath, options) { // eslint-disable-line require-await

src/core/components/files-regular/ls.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const callbackify = require('callbackify')
4-
const all = require('async-iterator-all')
4+
const all = require('it-all')
55

66
module.exports = function (self) {
77
return callbackify.variadic(async function ls (ipfsPath, options) { // eslint-disable-line require-await

src/core/components/files-regular/refs-local.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const callbackify = require('callbackify')
4-
const all = require('async-iterator-all')
4+
const all = require('it-all')
55

66
module.exports = function (self) {
77
return callbackify.variadic(async function refsLocal (ipfsPath, options) { // eslint-disable-line require-await

src/core/components/files-regular/refs.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const callbackify = require('callbackify')
4-
const all = require('async-iterator-all')
4+
const all = require('it-all')
55

66
module.exports = function (self) {
77
return callbackify.variadic(async function refs (ipfsPath, options) { // eslint-disable-line require-await

src/core/components/files-regular/utils.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,12 @@ const parseChunkerString = (chunker) => {
4545
}
4646
return {
4747
chunker: 'fixed',
48-
chunkerOptions: {
49-
maxChunkSize: size
50-
}
48+
maxChunkSize: size
5149
}
5250
} else if (chunker.startsWith('rabin')) {
5351
return {
5452
chunker: 'rabin',
55-
chunkerOptions: parseRabinString(chunker)
53+
...parseRabinString(chunker)
5654
}
5755
} else {
5856
throw new Error(`Unrecognized chunker option: ${chunker}`)

src/core/runtime/add-from-fs-nodejs.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const callbackify = require('callbackify')
44
const globSource = require('ipfs-utils/src/files/glob-source')
5-
const all = require('async-iterator-all')
5+
const all = require('it-all')
66

77
module.exports = self => {
88
return callbackify.variadic(async (...args) => { // eslint-disable-line require-await

src/http/api/resources/block.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const multibase = require('multibase')
77
const Boom = require('@hapi/boom')
88
const { cidToString } = require('../../../utils/cid')
99
const debug = require('debug')
10-
const all = require('async-iterator-all')
10+
const all = require('it-all')
1111
const streamResponse = require('../../utils/stream-response')
1212
const log = debug('ipfs:http-api:block')
1313
log.error = debug('ipfs:http-api:block:error')

src/http/api/resources/config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const multipart = require('ipfs-multipart')
99
const Boom = require('@hapi/boom')
1010
const Joi = require('@hapi/joi')
1111
const { profiles } = require('../../../core/components/config')
12-
const all = require('async-iterator-all')
12+
const all = require('it-all')
1313

1414
exports.getOrSet = {
1515
// pre request handler that parses the args and returns `key` & `value` which are assigned to `request.pre.args`

src/http/api/resources/dag.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const debug = require('debug')
1111
const {
1212
cidToString
1313
} = require('../../../utils/cid')
14-
const all = require('async-iterator-all')
14+
const all = require('it-all')
1515
const log = debug('ipfs:http-api:dag')
1616
log.error = debug('ipfs:http-api:dag:error')
1717

src/http/api/resources/files-regular.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ exports.add = {
159159
'only-hash': Joi.boolean(),
160160
pin: Joi.boolean().default(true),
161161
'wrap-with-directory': Joi.boolean(),
162+
'file-import-concurrency': Joi.number().integer().min(0).default(50),
163+
'block-write-concurrency': Joi.number().integer().min(0).default(10),
162164
chunker: Joi.string(),
163165
trickle: Joi.boolean(),
164166
preload: Joi.boolean().default(true)
@@ -218,7 +220,13 @@ exports.add = {
218220
pin: request.query.pin,
219221
chunker: request.query.chunker,
220222
trickle: request.query.trickle,
221-
preload: request.query.preload
223+
preload: request.query.preload,
224+
225+
// this has to be hardcoded to 1 because we can only read one file
226+
// at a time from a http request and we have to consume it completely
227+
// before we can read the next file
228+
fileImportConcurrency: 1,
229+
blockWriteConcurrency: request.query['block-write-concurrency']
222230
})
223231
},
224232
async function (source) {

src/http/api/resources/object.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const CID = require('cids')
44
const multipart = require('ipfs-multipart')
5-
const all = require('async-iterator-all')
5+
const all = require('it-all')
66
const dagPB = require('ipld-dag-pb')
77
const { DAGNode, DAGLink } = dagPB
88
const Joi = require('@hapi/joi')

test/core/files-regular-utils.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,27 @@ describe('files-regular/utils', () => {
2020
it('parses a fixed size string', () => {
2121
const options = utils.parseChunkerString('size-512')
2222
expect(options.chunker).to.equal('fixed')
23-
expect(options.chunkerOptions.maxChunkSize).to.equal(512)
23+
expect(options.maxChunkSize).to.equal(512)
2424
})
2525

2626
it('parses a rabin string without size', () => {
2727
const options = utils.parseChunkerString('rabin')
2828
expect(options.chunker).to.equal('rabin')
29-
expect(options.chunkerOptions.avgChunkSize).to.equal(262144)
29+
expect(options.avgChunkSize).to.equal(262144)
3030
})
3131

3232
it('parses a rabin string with only avg size', () => {
3333
const options = utils.parseChunkerString('rabin-512')
3434
expect(options.chunker).to.equal('rabin')
35-
expect(options.chunkerOptions.avgChunkSize).to.equal(512)
35+
expect(options.avgChunkSize).to.equal(512)
3636
})
3737

3838
it('parses a rabin string with min, avg, and max', () => {
3939
const options = utils.parseChunkerString('rabin-42-92-184')
4040
expect(options.chunker).to.equal('rabin')
41-
expect(options.chunkerOptions.minChunkSize).to.equal(42)
42-
expect(options.chunkerOptions.avgChunkSize).to.equal(92)
43-
expect(options.chunkerOptions.maxChunkSize).to.equal(184)
41+
expect(options.minChunkSize).to.equal(42)
42+
expect(options.avgChunkSize).to.equal(92)
43+
expect(options.maxChunkSize).to.equal(184)
4444
})
4545

4646
it('throws an error for unsupported chunker type', () => {

0 commit comments

Comments
 (0)