From 879f9a86bacb1f0d48896ddb312c2f0b43355958 Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Tue, 7 Jun 2016 12:12:41 -0700 Subject: [PATCH 1/4] End export stream on completion. --- package.json | 4 +- src/exporter.js | 90 +++++++++++++++++++++---------------------- test/test-exporter.js | 46 +++++++++++++--------- 3 files changed, 75 insertions(+), 65 deletions(-) diff --git a/package.json b/package.json index cf0d3dbe..413527a4 100644 --- a/package.json +++ b/package.json @@ -35,10 +35,12 @@ "homepage": "https://github.com/diasdavid/js-ipfs-data-importing#readme", "devDependencies": { "aegir": "^3.0.1", + "async": "^1.5.2", "block-stream2": "^1.1.0", "bs58": "^3.0.0", "buffer-loader": "0.0.1", "chai": "^3.5.0", + "concat-stream": "^1.5.1", "fs-blob-store": "^5.2.1", "idb-plus-blob-store": "^1.1.2", "ipfs-repo": "^0.7.5", @@ -51,9 +53,9 @@ "string-to-stream": "^1.0.1" }, "dependencies": { - "async": "^1.5.2", "block-stream2": "^1.1.0", "debug": "^2.2.0", + "field-trip": "0.0.2", "ipfs-merkle-dag": "^0.5.0", "ipfs-unixfs": "^0.1.0", "isstream": "^0.1.2", diff --git a/src/exporter.js b/src/exporter.js index 3bfaaab0..239d2613 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -5,10 +5,10 @@ const log = debug('exporter') log.err = debug('exporter:error') const UnixFS = require('ipfs-unixfs') const series = require('run-series') -const async = require('async') const Readable = require('readable-stream').Readable const pathj = require('path') const util = require('util') +const fieldtrip = require('field-trip') exports = module.exports = Exporter @@ -25,11 +25,12 @@ function Exporter (hash, dagService, options) { this._read = (n) => {} - let fileExporter = (node, name, callback) => { + let fileExporter = (node, name, done) => { let init - if (!callback) { callback = function noop () {} } + if (!done) throw new Error('done must be set') + // Logic to export a single (possibly chunked) unixfs file. var rs = new Readable() if (node.links.length === 0) { const unmarshaledData = UnixFS.unmarshal(node.data) @@ -43,8 +44,7 @@ function Exporter (hash, dagService, options) { rs.push(null) } this.push({ content: rs, path: name }) - callback() - return + done() } else { init = false rs._read = () => { @@ -57,7 +57,7 @@ function Exporter (hash, dagService, options) { return (cb) => { dagService.get(link.hash, (err, res) => { if (err) { - cb(err) + return cb(err) } var unmarshaledData = UnixFS.unmarshal(res.data) rs.push(unmarshaledData.data) @@ -67,26 +67,28 @@ function Exporter (hash, dagService, options) { }) series(array, (err, res) => { if (err) { - callback() + rs.emit('error', err) return } rs.push(null) - callback() return }) } this.push({ content: rs, path: name }) - callback() - return + done() } } - let dirExporter = (node, name, callback) => { + // Logic to export a unixfs directory. + let dirExporter = (node, name, add, done) => { let init - if (!callback) { callback = function noop () {} } + if (!add) throw new Error('add must be set') + if (!done) throw new Error('done must be set') var rs = new Readable() + + // Directory has no links if (node.links.length === 0) { init = false rs._read = () => { @@ -98,49 +100,45 @@ function Exporter (hash, dagService, options) { rs.push(null) } this.push({content: null, path: name}) - callback() - return + done() } else { - async.forEachSeries(node.links, (link, callback) => { - dagService.get(link.hash, (err, res) => { - if (err) { - callback(err) - } - var unmarshaledData = UnixFS.unmarshal(res.data) - if (unmarshaledData.type === 'file') { - return (fileExporter(res, pathj.join(name, link.name), callback)) - } - if (unmarshaledData.type === 'directory') { - return (dirExporter(res, pathj.join(name, link.name), callback)) - } - callback() - }) - }, (err) => { - if (err) { - callback() - return - } - callback() - return + node.links.forEach((link) => { + add({ path: pathj.join(name, link.name), hash: link.hash }) }) + done() } } - dagService.get(hash, (err, fetchedNode) => { + // Traverse the DAG asynchronously + var self = this + fieldtrip([{ path: hash, hash: hash }], visit, (err) => { if (err) { - this.emit('error', err) + self.emit('error', err) return } - const data = UnixFS.unmarshal(fetchedNode.data) - const type = data.type - - if (type === 'directory') { - dirExporter(fetchedNode, hash) - } - if (type === 'file') { - fileExporter(fetchedNode, hash) - } + self.push(null) }) + // Visit function: called once per node in the exported graph + function visit (item, add, done) { + dagService.get(item.hash, (err, fetchedNode) => { + if (err) { + self.emit('error', err) + return + } + + const data = UnixFS.unmarshal(fetchedNode.data) + const type = data.type + + if (type === 'directory') { + dirExporter(fetchedNode, item.path, add, done) + } + + if (type === 'file') { + fileExporter(fetchedNode, item.path, done) + } + }) + } + return this } diff --git a/test/test-exporter.js b/test/test-exporter.js index 8e4f79db..69474a2c 100644 --- a/test/test-exporter.js +++ b/test/test-exporter.js @@ -7,7 +7,7 @@ const expect = require('chai').expect const BlockService = require('ipfs-block-service') const DAGService = require('ipfs-merkle-dag').DAGService const UnixFS = require('ipfs-unixfs') -const bl = require('bl') +const concat = require('concat-stream') const fs = require('fs') const path = require('path') @@ -32,13 +32,16 @@ module.exports = function (repo) { const unmarsh = UnixFS.unmarshal(fetchedNode.data) expect(err).to.not.exist const testExport = exporter(hash, ds) - testExport.on('data', (file) => { - file.content.pipe(bl((err, bldata) => { - expect(err).to.not.exist + testExport.on('error', (err) => { + expect(err).to.not.exist + }) + testExport.pipe(concat((files) => { + expect(files).to.be.length(1) + files[0].content.pipe(concat((bldata) => { expect(bldata).to.deep.equal(unmarsh.data) done() })) - }) + })) }) }) @@ -47,10 +50,12 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) + testExport.on('error', (err) => { + expect(err).to.not.exist + }) testExport.on('data', (file) => { - file.content.pipe(bl((err, bldata) => { + file.content.pipe(concat((bldata) => { expect(bldata).to.deep.equal(bigFile) - expect(err).to.not.exist done() })) }) @@ -61,10 +66,13 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) + testExport.on('error', (err) => { + expect(err).to.not.exist + }) testExport.on('data', (file) => { expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE') - file.content.pipe(bl((err, bldata) => { - expect(err).to.not.exist + file.content.pipe(concat((bldata) => { + expect(bldata).to.exist done() })) }) @@ -75,17 +83,16 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) - var fsa = [] - testExport.on('data', (files) => { - fsa.push(files) + testExport.on('error', (err) => { + expect(err).to.not.exist }) - setTimeout(() => { - expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt') - expect(fsa[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another') - expect(fsa[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt') - expect(fsa[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2') + testExport.pipe(concat((files) => { + expect(files[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt') + expect(files[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another') + expect(files[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt') + expect(files[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2') done() - }, 1000) + })) }) it('returns a null stream for dir', (done) => { @@ -93,6 +100,9 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) + testExport.on('error', (err) => { + expect(err).to.not.exist + }) testExport.on('data', (dir) => { expect(dir.content).to.equal(null) done() From 41ef5425e824a6ba7b1532536631c9eec7acfefe Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Tue, 14 Jun 2016 18:10:30 -0700 Subject: [PATCH 2/4] Sanitize multihash input to Exporter. --- package.json | 2 ++ src/exporter.js | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/package.json b/package.json index 413527a4..18549d1a 100644 --- a/package.json +++ b/package.json @@ -54,10 +54,12 @@ }, "dependencies": { "block-stream2": "^1.1.0", + "bs58": "^3.0.0", "debug": "^2.2.0", "field-trip": "0.0.2", "ipfs-merkle-dag": "^0.5.0", "ipfs-unixfs": "^0.1.0", + "is-ipfs": "^0.2.0", "isstream": "^0.1.2", "readable-stream": "^1.1.13", "run-series": "^1.1.4", diff --git a/src/exporter.js b/src/exporter.js index 239d2613..519079b6 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -3,6 +3,8 @@ const debug = require('debug') const log = debug('exporter') log.err = debug('exporter:error') +const isIPFS = require('is-ipfs') +const bs58 = require('bs58') const UnixFS = require('ipfs-unixfs') const series = require('run-series') const Readable = require('readable-stream').Readable @@ -19,6 +21,14 @@ function Exporter (hash, dagService, options) { return new Exporter(hash, dagService, options) } + // Sanitize hash. + if (!isIPFS.multihash(hash)) { + throw new Error('not valid multihash') + } + if (Buffer.isBuffer(hash)) { + hash = bs58.encode(hash) + } + Readable.call(this, { objectMode: true }) this.options = options || {} From 461c5007d0019e665b93f886dede1abcbd5855ce Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Tue, 14 Jun 2016 18:24:11 -0700 Subject: [PATCH 3/4] Output directories in Exporter. --- package.json | 2 +- src/exporter.js | 22 ++++------------------ test/test-exporter.js | 10 ++++++---- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/package.json b/package.json index 18549d1a..ce4c2d2b 100644 --- a/package.json +++ b/package.json @@ -56,7 +56,7 @@ "block-stream2": "^1.1.0", "bs58": "^3.0.0", "debug": "^2.2.0", - "field-trip": "0.0.2", + "field-trip": "0.0.3", "ipfs-merkle-dag": "^0.5.0", "ipfs-unixfs": "^0.1.0", "is-ipfs": "^0.2.0", diff --git a/src/exporter.js b/src/exporter.js index 519079b6..2043bf19 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -91,32 +91,18 @@ function Exporter (hash, dagService, options) { // Logic to export a unixfs directory. let dirExporter = (node, name, add, done) => { - let init - if (!add) throw new Error('add must be set') if (!done) throw new Error('done must be set') - var rs = new Readable() + this.push({content: null, path: name}) - // Directory has no links - if (node.links.length === 0) { - init = false - rs._read = () => { - if (init) { - return - } - init = true - rs.push(node.data) - rs.push(null) - } - this.push({content: null, path: name}) - done() - } else { + // Directory has links + if (node.links.length > 0) { node.links.forEach((link) => { add({ path: pathj.join(name, link.name), hash: link.hash }) }) - done() } + done() } // Traverse the DAG asynchronously diff --git a/test/test-exporter.js b/test/test-exporter.js index 69474a2c..abb77456 100644 --- a/test/test-exporter.js +++ b/test/test-exporter.js @@ -87,10 +87,12 @@ module.exports = function (repo) { expect(err).to.not.exist }) testExport.pipe(concat((files) => { - expect(files[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt') - expect(files[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another') - expect(files[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt') - expect(files[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2') + expect(files[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN') + expect(files[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt') + expect(files[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another') + expect(files[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1') + expect(files[4].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt') + expect(files[5].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2') done() })) }) From a5b9816afd8412b3d7d2c22e22cb52b308351bdb Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Thu, 23 Jun 2016 14:48:45 -0700 Subject: [PATCH 4/4] revisions --- src/exporter.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/exporter.js b/src/exporter.js index 2043bf19..1833cf41 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -36,7 +36,7 @@ function Exporter (hash, dagService, options) { this._read = (n) => {} let fileExporter = (node, name, done) => { - let init + let init = false if (!done) throw new Error('done must be set') @@ -44,7 +44,6 @@ function Exporter (hash, dagService, options) { var rs = new Readable() if (node.links.length === 0) { const unmarshaledData = UnixFS.unmarshal(node.data) - init = false rs._read = () => { if (init) { return @@ -56,7 +55,6 @@ function Exporter (hash, dagService, options) { this.push({ content: rs, path: name }) done() } else { - init = false rs._read = () => { if (init) { return