Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

Commit 40908cc

Browse files
committed
perf: concurrent file import
Adds two new options: `fileImportConcurrency` This controls the number of files that are imported concurrently. You may wish to set this high if you are importing lots of small files. `blockWriteConcurrency` This controls how many blocks from each file we write to disk at the same time. Setting this high when writing large files will significantly increase import speed, though having it high when `fileImportConcurrency` is also high can swamp the process. It also: 1. Flattens module options because validating deep objects was clunky and the separation of access to config sub objects within this module isn't very good 1. Replaces `superstruct` and `deep-extend` with `merge-options` which is better suited for merging options and is smaller 1. Replaces `async-iterator-*` modules with the more zeitgeisty `it-*` namespace Supersedes #38, sort of.
1 parent d03441f commit 40908cc

22 files changed

+116
-147
lines changed

README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,9 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
124124
- `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports:
125125
- `fixed`
126126
- `rabin`
127-
- `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties:
128-
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
129-
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
130-
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
127+
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
128+
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
129+
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
131130
- `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports:
132131
- `flat`: flat list of chunks
133132
- `balanced`: builds a balanced tree
@@ -144,6 +143,8 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
144143
- `cidVersion` (integer, default 0): the CID version to use when storing the data (storage keys are based on the CID, _including_ it's version)
145144
- `rawLeaves` (boolean, defaults to false): When a file would span multiple DAGNodes, if this is true the leaf nodes will not be wrapped in `UnixFS` protobufs and will instead contain the raw file bytes
146145
- `leafType` (string, defaults to `'file'`) what type of UnixFS node leaves should be - can be `'file'` or `'raw'` (ignored when `rawLeaves` is `true`)
146+
- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50).
147+
- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50).
147148

148149
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
149150
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs

package.json

+10-8
Original file line numberDiff line numberDiff line change
@@ -38,33 +38,35 @@
3838
"homepage": "https://github.com/ipfs/js-ipfs-unixfs-importer#readme",
3939
"devDependencies": {
4040
"aegir": "^20.0.0",
41-
"async-iterator-buffer-stream": "^1.0.0",
42-
"async-iterator-last": "^1.0.0",
4341
"chai": "^4.2.0",
4442
"cids": "~0.7.1",
43+
"deep-extend": "~0.6.0",
4544
"detect-node": "^2.0.4",
4645
"dirty-chai": "^2.0.1",
4746
"ipfs-unixfs-exporter": "^0.39.0",
4847
"ipld": "^0.25.0",
4948
"ipld-in-memory": "^3.0.0",
49+
"it-buffer-stream": "^1.0.0",
50+
"it-last": "^1.0.0",
5051
"multihashes": "~0.4.14",
5152
"nyc": "^14.0.0",
5253
"sinon": "^7.1.0"
5354
},
5455
"dependencies": {
55-
"async-iterator-all": "^1.0.0",
56-
"async-iterator-batch": "~0.0.1",
57-
"async-iterator-first": "^1.0.0",
5856
"bl": "^4.0.0",
59-
"deep-extend": "~0.6.0",
6057
"err-code": "^2.0.0",
6158
"hamt-sharding": "~0.0.2",
6259
"ipfs-unixfs": "^0.2.0",
6360
"ipld-dag-pb": "^0.18.0",
61+
"it-all": "^1.0.1",
62+
"it-batch": "^1.0.3",
63+
"it-first": "^1.0.1",
64+
"it-flat-batch": "^1.0.2",
65+
"it-parallel-batch": "1.0.2",
66+
"merge-options": "^2.0.0",
6467
"multicodec": "~0.5.1",
6568
"multihashing-async": "^0.8.0",
66-
"rabin-wasm": "~0.0.8",
67-
"superstruct": "^0.8.2"
69+
"rabin-wasm": "~0.0.8"
6870
},
6971
"contributors": [
7072
"Alan Shaw <[email protected]>",

src/dag-builder/file/balanced.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
const batch = require('async-iterator-batch')
3+
const batch = require('it-flat-batch')
44

55
async function * balanced (source, reduce, options) {
66
yield await reduceToParents(source, reduce, options)

src/dag-builder/file/flat.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
'use strict'
22

3-
const batch = require('async-iterator-batch')
3+
const batch = require('it-flat-batch')
44

55
module.exports = async function * (source, reduce) {
66
const roots = []
77

8-
for await (const chunk of batch(source, Infinity)) {
8+
for await (const chunk of batch(source, Number.MAX_SAFE_INTEGER)) {
99
roots.push(await reduce(chunk))
1010
}
1111

src/dag-builder/file/index.js

+39-31
Original file line numberDiff line numberDiff line change
@@ -7,54 +7,62 @@ const {
77
DAGNode,
88
DAGLink
99
} = require('ipld-dag-pb')
10-
const all = require('async-iterator-all')
10+
const all = require('it-all')
11+
const parallelBatch = require('it-parallel-batch')
1112

1213
const dagBuilders = {
1314
flat: require('./flat'),
1415
balanced: require('./balanced'),
1516
trickle: require('./trickle')
1617
}
1718

18-
async function * buildFile (file, source, ipld, options) {
19-
let count = -1
20-
let previous
21-
19+
async function * importBuffer (file, source, ipld, options) {
2220
for await (const buffer of source) {
23-
count++
24-
options.progress(buffer.length)
25-
let node
26-
let unixfs
21+
yield async () => {
22+
options.progress(buffer.length)
23+
let node
24+
let unixfs
2725

28-
const opts = {
29-
...options
30-
}
26+
const opts = {
27+
...options
28+
}
3129

32-
if (options.rawLeaves) {
33-
node = buffer
30+
if (options.rawLeaves) {
31+
node = buffer
3432

35-
opts.codec = 'raw'
36-
opts.cidVersion = 1
37-
} else {
38-
unixfs = new UnixFS(options.leafType, buffer)
33+
opts.codec = 'raw'
34+
opts.cidVersion = 1
35+
} else {
36+
unixfs = new UnixFS(options.leafType, buffer)
3937

40-
if (file.mtime) {
41-
unixfs.mtime = file.mtime
42-
}
38+
if (file.mtime) {
39+
unixfs.mtime = file.mtime
40+
}
41+
42+
if (file.mode) {
43+
unixfs.mode = file.mode
44+
}
4345

44-
if (file.mode) {
45-
unixfs.mode = file.mode
46+
node = new DAGNode(unixfs.marshal())
4647
}
4748

48-
node = new DAGNode(unixfs.marshal())
49+
const cid = await persist(node, ipld, opts)
50+
51+
return {
52+
cid: cid,
53+
unixfs,
54+
node
55+
}
4956
}
57+
}
58+
}
5059

51-
const cid = await persist(node, ipld, opts)
60+
async function * buildFileBatch (file, source, ipld, options) {
61+
let count = -1
62+
let previous
5263

53-
const entry = {
54-
cid: cid,
55-
unixfs,
56-
node
57-
}
64+
for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) {
65+
count++
5866

5967
if (count === 0) {
6068
previous = entry
@@ -149,7 +157,7 @@ const fileBuilder = async (file, source, ipld, options) => {
149157
throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY')
150158
}
151159

152-
const roots = await all(dagBuilder(buildFile(file, source, ipld, options), reduce(file, ipld, options), options.builderOptions))
160+
const roots = await all(dagBuilder(buildFileBatch(file, source, ipld, options), reduce(file, ipld, options), options))
153161

154162
if (roots.length > 1) {
155163
throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS')

src/dag-builder/file/trickle.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
const batch = require('async-iterator-batch')
3+
const batch = require('it-flat-batch')
44

55
module.exports = function * trickleReduceToRoot (source, reduce, options) {
66
yield trickleStream(source, reduce, options)

src/dag-builder/index.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ async function * dagBuilder (source, ipld, options) {
3030
}
3131
}
3232

33-
const chunker = createChunker(options.chunker, validateChunks(source), options.chunkerOptions)
33+
const chunker = createChunker(options.chunker, validateChunks(source), options)
3434

3535
// item is a file
36-
yield fileBuilder(entry, chunker, ipld, options)
36+
yield () => fileBuilder(entry, chunker, ipld, options)
3737
} else {
3838
// item is a directory
39-
yield dirBuilder(entry, ipld, options)
39+
yield () => dirBuilder(entry, ipld, options)
4040
}
4141
}
4242
}

src/dir-sharded.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const multihashing = require('multihashing-async')
99
const Dir = require('./dir')
1010
const persist = require('./utils/persist')
1111
const Bucket = require('hamt-sharding')
12-
const extend = require('deep-extend')
12+
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })
1313

1414
const hashFn = async function (value) {
1515
const hash = await multihashing(Buffer.from(value, 'utf8'), 'murmur3-128')
@@ -36,7 +36,7 @@ const defaultOptions = {
3636

3737
class DirSharded extends Dir {
3838
constructor (props, options) {
39-
options = extend({}, defaultOptions, options)
39+
options = mergeOptions(defaultOptions, options)
4040

4141
super(props, options)
4242

src/index.js

+26-63
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,41 @@
11
'use strict'
22

3-
const { superstruct } = require('superstruct')
43
const dagBuilder = require('./dag-builder')
54
const treeBuilder = require('./tree-builder')
6-
const mh = require('multihashes')
5+
const parallelBatch = require('it-parallel-batch')
6+
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })
77

8-
const struct = superstruct({
9-
types: {
10-
codec: v => ['dag-pb', 'dag-cbor', 'raw'].includes(v),
11-
hashAlg: v => Object.keys(mh.names).includes(v),
12-
leafType: v => ['file', 'raw'].includes(v)
13-
}
14-
})
15-
16-
const ChunkerOptions = struct({
17-
minChunkSize: 'number?',
18-
maxChunkSize: 'number?',
19-
avgChunkSize: 'number?',
20-
window: 'number?',
21-
polynomial: 'number?'
22-
}, {
23-
maxChunkSize: 262144,
24-
avgChunkSize: 262144,
25-
window: 16,
26-
polynomial: 17437180132763653 // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
27-
})
28-
29-
const BuilderOptions = struct({
30-
maxChildrenPerNode: 'number?',
31-
layerRepeat: 'number?'
32-
}, {
33-
maxChildrenPerNode: 174,
34-
layerRepeat: 4
35-
})
36-
37-
const Options = struct({
38-
chunker: struct.enum(['fixed', 'rabin']),
39-
rawLeaves: 'boolean?',
40-
hashOnly: 'boolean?',
41-
strategy: struct.enum(['balanced', 'flat', 'trickle']),
42-
reduceSingleLeafToSelf: 'boolean?',
43-
codec: 'codec?',
44-
format: 'codec?',
45-
hashAlg: 'hashAlg?',
46-
leafType: 'leafType?',
47-
cidVersion: 'number?',
48-
progress: 'function?',
49-
wrapWithDirectory: 'boolean?',
50-
shardSplitThreshold: 'number?',
51-
onlyHash: 'boolean?',
52-
chunkerOptions: ChunkerOptions,
53-
builderOptions: BuilderOptions,
54-
55-
wrap: 'boolean?',
56-
pin: 'boolean?',
57-
recursive: 'boolean?',
58-
ignore: 'array?',
59-
hidden: 'boolean?',
60-
preload: 'boolean?'
61-
}, {
8+
const defaultOptions = {
629
chunker: 'fixed',
63-
strategy: 'balanced',
10+
strategy: 'balanced', // 'flat', 'trickle'
6411
rawLeaves: false,
12+
onlyHash: false,
6513
reduceSingleLeafToSelf: true,
6614
codec: 'dag-pb',
6715
hashAlg: 'sha2-256',
68-
leafType: 'file',
16+
leafType: 'file', // 'raw'
6917
cidVersion: 0,
7018
progress: () => () => {},
71-
shardSplitThreshold: 1000
72-
})
19+
shardSplitThreshold: 1000,
20+
fileImportConcurrency: 50,
21+
blockWriteConcurrency: 10,
22+
minChunkSize: 262144,
23+
maxChunkSize: 262144,
24+
avgChunkSize: 262144,
25+
window: 16,
26+
polynomial: 17437180132763653, // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
27+
maxChildrenPerNode: 174,
28+
layerRepeat: 4,
29+
wrapWithDirectory: false,
30+
pin: true,
31+
recursive: false,
32+
ignore: null, // []
33+
hidden: false,
34+
preload: true
35+
}
7336

7437
module.exports = async function * (source, ipld, options = {}) {
75-
const opts = Options(options)
38+
const opts = mergeOptions(defaultOptions, options)
7639

7740
if (options.cidVersion > 0 && options.rawLeaves === undefined) {
7841
// if the cid version is 1 or above, use raw leaves as this is
@@ -93,10 +56,10 @@ module.exports = async function * (source, ipld, options = {}) {
9356
}
9457

9558
if (options.format) {
96-
options.codec = options.format
59+
opts.codec = options.format
9760
}
9861

99-
for await (const entry of treeBuilder(dagBuilder(source, ipld, opts), ipld, opts)) {
62+
for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) {
10063
yield {
10164
cid: entry.cid,
10265
path: entry.path,

src/tree-builder.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const flatToShard = require('./flat-to-shard')
55
const Dir = require('./dir')
66
const toPathComponents = require('./utils/to-path-components')
77
const errCode = require('err-code')
8-
const first = require('async-iterator-first')
8+
const first = require('it-first')
99

1010
async function addToTree (elem, tree, options) {
1111
const pathElems = toPathComponents(elem.path || '')
@@ -61,6 +61,9 @@ async function * treeBuilder (source, ipld, options) {
6161
}, options)
6262

6363
for await (const entry of source) {
64+
if (!entry) {
65+
continue
66+
}
6467
tree = await addToTree(entry, tree, options)
6568

6669
yield entry

test/benchmark.spec.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ const importer = require('../src')
55

66
const IPLD = require('ipld')
77
const inMemory = require('ipld-in-memory')
8-
const bufferStream = require('async-iterator-buffer-stream')
9-
const all = require('async-iterator-all')
8+
const bufferStream = require('it-buffer-stream')
9+
const all = require('it-all')
1010

1111
const REPEATS = 10
1212
const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB

test/builder-balanced.spec.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const chai = require('chai')
55
chai.use(require('dirty-chai'))
66
const expect = chai.expect
77
const builder = require('../src/dag-builder/file/balanced')
8-
const all = require('async-iterator-all')
8+
const all = require('it-all')
99

1010
function reduce (leaves) {
1111
if (leaves.length > 1) {

test/builder-dir-sharding.spec.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ chai.use(require('dirty-chai'))
99
const expect = chai.expect
1010
const IPLD = require('ipld')
1111
const inMemory = require('ipld-in-memory')
12-
const all = require('async-iterator-all')
13-
const last = require('async-iterator-last')
12+
const all = require('it-all')
13+
const last = require('it-last')
1414

1515
describe('builder: directory sharding', () => {
1616
let ipld

0 commit comments

Comments
 (0)