diff --git a/package.json b/package.json index 43c5a4d..2a22065 100644 --- a/package.json +++ b/package.json @@ -40,16 +40,21 @@ "aegir": "^20.0.0", "async-iterator-buffer-stream": "^1.0.0", "async-iterator-last": "^1.0.0", + "benchmark": "^2.1.4", "chai": "^4.2.0", - "cids": "~0.7.1", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", + "ipfs-block-service": "^0.16.0", + "ipfs-repo": "^0.28.0", "ipfs-unixfs-exporter": "~0.37.0", - "ipld": "^0.25.0", + "ipld": "ipld/js-ipld#feat/batch-put", "ipld-in-memory": "^3.0.0", + "iso-random-stream": "^1.1.1", + "lodash": "^4.17.15", "multihashes": "~0.4.14", "nyc": "^14.0.0", - "sinon": "^7.1.0" + "sinon": "^7.1.0", + "tempy": "^0.3.0" }, "dependencies": { "async-iterator-all": "^1.0.0", @@ -59,11 +64,13 @@ "deep-extend": "~0.6.0", "err-code": "^2.0.0", "hamt-sharding": "~0.0.2", + "ipfs-block": "^0.8.1", "ipfs-unixfs": "~0.1.16", "ipld-dag-pb": "^0.18.0", "multicodec": "~0.5.1", "multihashing-async": "~0.7.0", "rabin-wasm": "~0.0.8", + "set-interval-async": "^1.0.29", "superstruct": "~0.6.1" }, "contributors": [ diff --git a/src/dag-builder/file/index.js b/src/dag-builder/file/index.js index a3fb565..3f1ac79 100644 --- a/src/dag-builder/file/index.js +++ b/src/dag-builder/file/index.js @@ -1,7 +1,12 @@ 'use strict' +const Block = require('ipfs-block') const errCode = require('err-code') const UnixFS = require('ipfs-unixfs') +const mh = require('multihashes') +const mc = require('multicodec') +const { setIntervalAsync } = require('set-interval-async/dynamic') +const { clearIntervalAsync } = require('set-interval-async') const persist = require('../../utils/persist') const { DAGNode, @@ -25,13 +30,10 @@ async function * buildFile (source, ipld, options) { let node let unixfs - const opts = { - ...options - } + const opts = { ...options } if (options.rawLeaves) { node = buffer - opts.codec = 'raw' opts.cidVersion = 1 } else { @@ -64,6 +66,83 @@ async function * buildFile (source, ipld, options) { } } +const serialize = (node, ipld, options) => { + if (!options.codec && node.length) { + options.cidVersion = 1 + options.codec = 'raw' + } + + if (isNaN(options.hashAlg)) { + options.hashAlg = mh.names[options.hashAlg] + } + + if (options.hashAlg !== mh.names['sha2-256']) { + options.cidVersion = 1 + } + + if (options.format) { + options.codec = options.format + } + + const format = mc[options.codec.toUpperCase().replace(/-/g, '_')] + + return ipld.serialize(node, format, options) +} + +async function * buildFileBatch (source, ipld, options) { + let count = -1 + let previous + let nodesToPersist = [] + + const timer = setIntervalAsync(async () => { + const temp = nodesToPersist + nodesToPersist = [] + await ipld.putBatch(temp, options) + }, options.batchInterval) + + for await (const buffer of source) { + count++ + options.progress(buffer.length) + let node + let unixfs + const opts = { ...options } + if (options.rawLeaves) { + node = buffer + opts.codec = 'raw' + opts.cidVersion = 1 + } else { + unixfs = new UnixFS(options.leafType, buffer) + node = new DAGNode(unixfs.marshal()) + } + + const result = await serialize(node, ipld, opts) + nodesToPersist.push(new Block(result[1], result[0])) + + const entry = { + cid: result[0], + unixfs, + node + } + if (count === 0) { + previous = entry + continue + } else if (count === 1) { + yield previous + previous = null + } + + yield entry + } + // Wait for everything to be saved + await clearIntervalAsync(timer) + await ipld.putBatch(nodesToPersist, options) + + if (previous) { + previous.single = true + yield previous + } +} + const reduce = (file, ipld, options) => { return async function (leaves) { if (leaves.length === 1 && leaves[0].single && options.reduceSingleLeafToSelf) { @@ -132,7 +211,11 @@ const fileBuilder = async (file, source, ipld, options) => { throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY') } - const roots = await all(dagBuilder(buildFile(source, ipld, options), reduce(file, ipld, options), options.builderOptions)) + const roots = await all(dagBuilder( + options.batch ? buildFileBatch(source, ipld, options) : buildFile(source, ipld, options), + reduce(file, ipld, options), + options.builderOptions + )) if (roots.length > 1) { throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS') diff --git a/src/index.js b/src/index.js index 4ff621d..75a01c6 100644 --- a/src/index.js +++ b/src/index.js @@ -38,6 +38,8 @@ const Options = struct({ chunker: struct.enum(['fixed', 'rabin']), rawLeaves: 'boolean?', hashOnly: 'boolean?', + batch: 'boolean?', + batchInterval: 'number?', strategy: struct.enum(['balanced', 'flat', 'trickle']), reduceSingleLeafToSelf: 'boolean?', codec: 'codec?', @@ -62,6 +64,8 @@ const Options = struct({ chunker: 'fixed', strategy: 'balanced', rawLeaves: false, + batch: true, + batchInterval: 50, reduceSingleLeafToSelf: true, codec: 'dag-pb', hashAlg: 'sha2-256', diff --git a/test/benchmark.spec.js b/test/benchmark.js similarity index 96% rename from test/benchmark.spec.js rename to test/benchmark.js index 1c96013..b0b1db2 100644 --- a/test/benchmark.spec.js +++ b/test/benchmark.js @@ -24,7 +24,7 @@ describe.skip('benchmark', function () { const times = [] after(() => { - console.info(`Percent\tms`) // eslint-disable-line no-console + console.info('Percent\tms') // eslint-disable-line no-console times.forEach((time, index) => { console.info(`${index}\t${parseInt(time / REPEATS)}`) // eslint-disable-line no-console }) diff --git a/test/test-speed.browser.js b/test/test-speed.browser.js new file mode 100644 index 0000000..8e6b3a1 --- /dev/null +++ b/test/test-speed.browser.js @@ -0,0 +1,254 @@ +/* eslint-disable no-console */ +/* eslint-env mocha */ +'use strict' + +const importer = require('../src') +const IPLD = require('ipld') +const bufferStream = require('async-iterator-buffer-stream') +const all = require('async-iterator-all') +const randomBuffer = require('iso-random-stream/src/random') +const IPFSRepo = require('ipfs-repo') +const BlockService = require('ipfs-block-service') + +// Run benchmarkjs in the browser https://github.com/bestiejs/benchmark.js/issues/128#issuecomment-271615298 +// We need to change Karma webpack config according to the above link +const _ = require('lodash') +const process = require('process') +const benchmark = require('benchmark') +const Benchmark = benchmark.runInContext({ _, process }) +window.Benchmark = Benchmark + +const createIPLD = async (opts) => { + const repo = new IPFSRepo(`repo-${Math.random()}`, opts) + const blockService = new BlockService(repo) + await repo.init({}) + await repo.open() + return new IPLD({ blockService }) +} + +const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB +const CHUNK_SIZE = 65536 + +describe('benchmark', function () { + this.timeout(0) + it('single run 500mb without batch', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + const times = [] + let read = 0 + let lastDate = Date.now() + let lastPercent = 0 + + const options = { + batch: false, + progress: (prog) => { + read += prog + const percent = parseInt((read / FILE_SIZE) * 100) + if (percent > lastPercent) { + times[percent] = (times[percent] || 0) + (Date.now() - lastDate) + lastDate = Date.now() + lastPercent = percent + } + } + } + + await all(importer([{ + path: 'single-500.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + console.info('Percent\tms') // eslint-disable-line no-console + // times.forEach((time, index) => { + // console.info(`${index}\t${parseInt(time)}`) // eslint-disable-line no-console + // }) + }) + it('single run 500mb with batch 50ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 50, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-50.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 100ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 100, + batch: true + } + + await all(importer([{ + path: 'single-500-batch.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 150ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 150, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-150.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 200ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 200, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-200.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + const sizes = [10, 50, 100] + + for (const size of sizes) { + it(`benchmark ${size}mb`, (done) => { + const suite = new Benchmark.Suite() + const FILE_SIZE = Math.pow(2, 20) * size + + suite + .add('without batch', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: '200Bytes.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: false } + )) + + deferred.resolve() + } + }) + .add('batch 50ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 50 } + )) + + deferred.resolve() + } + }) + .add('batch 100ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 100 } + )) + + deferred.resolve() + } + }) + .add('batch 150ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 150 } + )) + + deferred.resolve() + } + }) + .add('batch 200mb ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 200 } + )) + + deferred.resolve() + } + }) + .on('cycle', function (event) { + console.log(String(event.target)) + }) + .on('complete', function () { + console.log('Fastest is ' + this.filter('fastest').map('name')) + done() + }) + .run({ async: true }) + }) + } +}) diff --git a/test/test-speed.js b/test/test-speed.js new file mode 100644 index 0000000..5cf3239 --- /dev/null +++ b/test/test-speed.js @@ -0,0 +1,252 @@ +/* eslint-disable no-console */ +/* eslint-env mocha */ +'use strict' + +const importer = require('../src') +const IPLD = require('ipld') +const bufferStream = require('async-iterator-buffer-stream') +const all = require('async-iterator-all') +const randomBuffer = require('iso-random-stream/src/random') +const IPFSRepo = require('ipfs-repo') +const BlockService = require('ipfs-block-service') +const tempy = require('tempy') +const Benchmark = require('benchmark') + +const createIPLD = async (opts) => { + const repo = new IPFSRepo(tempy.directory(), opts) + const blockService = new BlockService(repo) + await repo.init({}) + await repo.open() + return new IPLD({ blockService }) +} + +const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB +const CHUNK_SIZE = 65536 + +describe('benchmark', function () { + this.timeout(0) + it('single run 500mb without batch', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + const times = [] + let read = 0 + let lastDate = Date.now() + let lastPercent = 0 + + const options = { + batchInterval: 50, + batch: false, + progress: (prog) => { + read += prog + const percent = parseInt((read / FILE_SIZE) * 100) + if (percent > lastPercent) { + times[percent] = (times[percent] || 0) + (Date.now() - lastDate) + lastDate = Date.now() + lastPercent = percent + } + } + } + + await all(importer([{ + path: 'single-500.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + // console.info('Percent\tms') // eslint-disable-line no-console + // times.forEach((time, index) => { + // console.info(`${index}\t${parseInt(time)}`) // eslint-disable-line no-console + // }) + }) + + it('single run 500mb with batch 50ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 50, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-50.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + it('single run 500mb with batch 100ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 100, + batch: true + } + + await all(importer([{ + path: 'single-500-batch.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + it('single run 500mb with batch 150ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 150, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-150.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + it('single run 500mb with batch 200ms', async () => { // eslint-disable-line no-loop-func + const ipld = await createIPLD() + + const options = { + batchInterval: 200, + batch: true + } + + await all(importer([{ + path: 'single-500-batch-200.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => { + return randomBuffer(CHUNK_SIZE) + } + }) + }], ipld, options)) + }) + + const sizes = [10, 50, 100, 200] + + for (const size of sizes) { + it(`benchmark ${size}mb`, (done) => { + const suite = new Benchmark.Suite() + const FILE_SIZE = Math.pow(2, 20) * size + + suite + .add('without batch', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: '200Bytes.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: false } + )) + + deferred.resolve() + } + }) + .add('batch 50ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 50 } + )) + + deferred.resolve() + } + }) + .add('batch 100ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 100 } + )) + + deferred.resolve() + } + }) + .add('batch 150ms ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 150 } + )) + + deferred.resolve() + } + }) + .add('batch 200mb ', { + defer: true, + fn: async (deferred) => { + const ipld = await createIPLD() + await all(importer( + [{ + path: 'batch100.txt', + content: bufferStream(FILE_SIZE, { + chunkSize: CHUNK_SIZE, + generator: () => randomBuffer(CHUNK_SIZE) + }) + }], + ipld, + { batch: true, batchInterval: 200 } + )) + + deferred.resolve() + } + }) + .on('cycle', function (event) { + console.log(String(event.target)) + }) + .on('complete', function () { + console.log('Fastest is ' + this.filter('fastest').map('name')) + done() + }) + .run({ async: true }) + }) + } +})