diff --git a/package.json b/package.json index dcd9edfceb..e3d3898482 100644 --- a/package.json +++ b/package.json @@ -74,7 +74,7 @@ "expose-loader": "^0.7.3", "form-data": "^2.3.1", "gulp": "^3.9.1", - "interface-ipfs-core": "~0.31.19", + "interface-ipfs-core": "~0.32.1", "ipfsd-ctl": "~0.23.0", "left-pad": "^1.1.3", "lodash": "^4.17.4", @@ -93,16 +93,18 @@ "async": "^2.5.0", "bl": "^1.2.1", "boom": "^5.2.0", - "debug": "^3.0.1", + "byteman": "^1.3.5", "cids": "^0.5.1", + "debug": "^3.0.1", "file-type": "^6.1.0", "filesize": "^3.5.10", "fsm-event": "^2.1.0", + "get-folder-size": "^1.0.0", "glob": "^7.1.2", "hapi": "^16.5.2", "hapi-set-header": "^1.0.2", "hoek": "^4.2.0", - "ipfs-api": "^14.3.5", + "ipfs-api": "^14.3.7", "ipfs-bitswap": "~0.17.2", "ipfs-block": "~0.6.0", "ipfs-block-service": "~0.12.0", @@ -138,8 +140,11 @@ "peer-book": "~0.5.1", "peer-id": "~0.10.1", "peer-info": "~0.11.0", + "progress": "^2.0.0", "promisify-es6": "^1.0.3", + "pull-abortable": "^4.1.1", "pull-file": "^1.0.0", + "pull-ndjson": "^0.1.1", "pull-paramap": "^1.2.2", "pull-pushable": "^2.1.1", "pull-sort": "^1.0.1", @@ -216,4 +221,4 @@ "Łukasz Magiera ", "ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ " ] -} \ No newline at end of file +} diff --git a/src/cli/commands/files/add.js b/src/cli/commands/files/add.js index 82fe7ecb18..805f80217f 100644 --- a/src/cli/commands/files/add.js +++ b/src/cli/commands/files/add.js @@ -8,8 +8,12 @@ const pull = require('pull-stream') const paramap = require('pull-paramap') const zip = require('pull-zip') const toPull = require('stream-to-pull-stream') +const getFolderSize = require('get-folder-size') +const byteman = require('byteman') +const waterfall = require('async/waterfall') const utils = require('../../utils') const print = require('../../utils').print +const createProgressBar = require('../../utils').createProgressBar const WRAPPER = 'wrapper/' @@ -40,6 +44,14 @@ function checkPath (inPath, recursive) { return inPath } +function getTotalBytes (path, recursive, cb) { + if (recursive) { + getFolderSize(path, cb) + } else { + fs.stat(path, (err, stat) => cb(err, stat.size)) + } +} + function addPipeline (index, addStream, list, argv) { const { wrapWithDirectory, @@ -47,7 +59,6 @@ function addPipeline (index, addStream, list, argv) { quieter, silent } = argv - pull( zip( pull.values(list), @@ -102,6 +113,12 @@ module.exports = { describe: 'Add a file to IPFS using the UnixFS data format', builder: { + progress: { + alias: 'p', + type: 'boolean', + default: true, + describe: 'Stream progress data' + }, recursive: { alias: 'r', type: 'boolean', @@ -185,34 +202,45 @@ module.exports = { } const ipfs = argv.ipfs - // TODO: revist when interface-ipfs-core exposes pull-streams - let createAddStream = (cb) => { - ipfs.files.createAddStream(options, (err, stream) => { - cb(err, err ? null : toPull.transform(stream)) - }) - } - - if (typeof ipfs.files.createAddPullStream === 'function') { - createAddStream = (cb) => { - cb(null, ipfs.files.createAddPullStream(options)) - } - } + let list = [] + let currentBytes = 0 + + waterfall([ + (next) => glob(path.join(inPath, '/**/*'), next), + (globResult, next) => { + list = globResult.length === 0 ? [inPath] : globResult + + getTotalBytes(inPath, argv.recursive, next) + }, + (totalBytes, next) => { + if (argv.progress) { + const bar = createProgressBar(totalBytes) + options.progress = function (byteLength) { + currentBytes += byteLength + bar.tick(byteLength, {progress: byteman(currentBytes, 2, 'MB')}) + } + } - createAddStream((err, addStream) => { - if (err) { - throw err - } + // TODO: revist when interface-ipfs-core exposes pull-streams - glob(path.join(inPath, '/**/*'), (err, list) => { - if (err) { - throw err + let createAddStream = (cb) => { + ipfs.files.createAddStream(options, (err, stream) => { + cb(err, err ? null : toPull.transform(stream)) + }) } - if (list.length === 0) { - list = [inPath] + + if (typeof ipfs.files.createAddPullStream === 'function') { + createAddStream = (cb) => { + cb(null, ipfs.files.createAddPullStream(options)) + } } - addPipeline(index, addStream, list, argv) - }) + createAddStream(next) + } + ], (err, addStream) => { + if (err) throw err + + addPipeline(index, addStream, list, argv) }) } } diff --git a/src/cli/utils.js b/src/cli/utils.js index 76ab67d969..4565fc316e 100644 --- a/src/cli/utils.js +++ b/src/cli/utils.js @@ -9,6 +9,8 @@ const path = require('path') const debug = require('debug') const log = debug('cli') log.error = debug('cli:error') +const Progress = require('progress') +const byteman = require('byteman') exports = module.exports @@ -85,3 +87,16 @@ exports.print = (msg, newline) => { process.stdout.write(msg) } } + +exports.createProgressBar = (totalBytes) => { + const total = byteman(totalBytes, 2, 'MB') + const barFormat = `:progress / ${total} [:bar] :percent :etas` + + // 16 MB / 34 MB [=========== ] 48% 5.8s // + return new Progress(barFormat, { + incomplete: ' ', + clear: true, + stream: process.stdout, + total: totalBytes + }) +} diff --git a/src/core/components/files.js b/src/core/components/files.js index 6a9f5fbced..7dc619bf9c 100644 --- a/src/core/components/files.js +++ b/src/core/components/files.js @@ -54,7 +54,7 @@ module.exports = function files (self) { add: promisify((data, options, callback) => { if (typeof options === 'function') { callback = options - options = undefined + options = {} } else if (!callback || typeof callback !== 'function') { callback = noop } @@ -65,6 +65,14 @@ module.exports = function files (self) { return callback(new Error('Invalid arguments, data must be an object, Buffer or readable stream')) } + let total = 0 + let prog = options.progress || (() => {}) + const progress = (bytes) => { + total += bytes + prog(total) + } + + options.progress = progress pull( pull.values(normalizeContent(data)), importer(self._ipldResolver, options), diff --git a/src/http/api/resources/files.js b/src/http/api/resources/files.js index 35c53a32e6..64466b1622 100644 --- a/src/http/api/resources/files.js +++ b/src/http/api/resources/files.js @@ -9,9 +9,10 @@ log.error = debug('jsipfs:http-api:files:error') const pull = require('pull-stream') const toPull = require('stream-to-pull-stream') const pushable = require('pull-pushable') -const EOL = require('os').EOL const toStream = require('pull-stream-to-stream') +const abortable = require('pull-abortable') const Joi = require('joi') +const ndjson = require('pull-ndjson') exports = module.exports @@ -104,7 +105,7 @@ exports.get = { pull( stream, pull.asyncMap((file, cb) => { - const header = {name: file.path} + const header = { name: file.path } if (!file.content) { header.type = 'directory' pack.entry(header) @@ -207,9 +208,47 @@ exports.add = { fileAdder.end() }) + const replyStream = pushable() + let total = 0 + const progressHandler = (bytes) => { + total += bytes + replyStream.push({ Bytes: total }) + } + const options = { 'cid-version': request.query['cid-version'], - 'raw-leaves': request.query['raw-leaves'] + 'raw-leaves': request.query['raw-leaves'], + progress: request.query['progress'] ? progressHandler : null + } + + const aborter = abortable() + const stream = toStream.source(pull( + replyStream, + aborter, + ndjson.serialize() + )) + + // const stream = toStream.source(replyStream.source) + // hapi is not very clever and throws if no + // - _read method + // - _readableState object + // are there :( + if (!stream._read) { + stream._read = () => {} + stream._readableState = {} + stream.unpipe = () => {} + } + reply(stream) + .header('x-chunked-output', '1') + .header('content-type', 'application/json') + .header('Trailer', 'X-Stream-Error') + + function _writeErr (msg, code) { + const err = JSON.stringify({ Message: msg, Code: code }) + request.raw.res.addTrailers({ + 'X-Stream-Error': err + }) + return aborter.abort() } pull( @@ -218,28 +257,21 @@ exports.add = { pull.map((file) => { return { Name: file.path ? file.path : file.hash, - Hash: file.hash + Hash: file.hash, + Size: file.size } }), - pull.map((file) => JSON.stringify(file) + EOL), pull.collect((err, files) => { if (err) { - return reply({ - Message: err, - Code: 0 - }).code(500) + return _writeErr(err, 0) } if (files.length === 0 && filesParsed) { - return reply({ - Message: 'Failed to add files.', - Code: 0 - }).code(500) + return _writeErr('Failed to add files.', 0) } - reply(files.join('\n')) - .header('x-chunked-output', '1') - .header('content-type', 'application/json') + files.forEach((f) => replyStream.push(f)) + replyStream.end() }) ) } diff --git a/test/cli/files.js b/test/cli/files.js index afcf581112..93da6eb59f 100644 --- a/test/cli/files.js +++ b/test/cli/files.js @@ -106,6 +106,14 @@ describe('files', () => runOnAndOff((thing) => { ipfs = thing.ipfs }) + it('add with progress', () => { + return ipfs('files add -p src/init-files/init-docs/readme') + .then((out) => { + expect(out) + .to.eql('added QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB readme\n') + }) + }) + it('add', () => { return ipfs('files add src/init-files/init-docs/readme') .then((out) => { diff --git a/test/cli/progress-bar.js b/test/cli/progress-bar.js new file mode 100644 index 0000000000..21708a59f8 --- /dev/null +++ b/test/cli/progress-bar.js @@ -0,0 +1,15 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const createProgressBar = require('../../src/cli/utils').createProgressBar + +describe('progress bar', () => { + it('created with the correct properties', () => { + const total = 1000 + + const bar = createProgressBar(total) + expect(bar.total).to.eql(total) + expect(typeof bar.tick).to.eql('function') + }) +})