diff --git a/README.md b/README.md index 6a1ced84..d56c27b1 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ IPFS unixFS Engine =================== -> Import data into an IPFS DAG Service. +> Import & Export data to/from an [IPFS DAG Service][] [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) @@ -47,8 +47,8 @@ const res = [] const rs = fs.createReadStream(file) const rs2 = fs.createReadStream(file2) -const input = {path: /tmp/foo/bar, stream: rs} -const input2 = {path: /tmp/foo/quxx, stream: rs2} +const input = {path: /tmp/foo/bar, content: rs} +const input2 = {path: /tmp/foo/quxx, content: rs2} // Listen for the data event from the importer stream @@ -74,24 +74,24 @@ When run, the stat of DAG Node is outputted for each file on data event until th ``` { multihash: , - Size: 39243, + size: 39243, path: '/tmp/foo/bar' } { multihash: , - Size: 59843, + size: 59843, path: '/tmp/foo/quxx' } { multihash: , - Size: 93242, + size: 93242, path: '/tmp/foo' } { multihash: , - Size: 94234, + size: 94234, path: '/tmp' } ``` -## API +## Importer API ```js const Importer = require('ipfs-unixfs-engine').importer @@ -99,16 +99,22 @@ const Importer = require('ipfs-unixfs-engine').importer ### const add = new Importer(dag) -The importer is a duplex stream in object mode that writes inputs of tuples -of path and readable streams of data. You can stream an array of files to the -importer, just call the 'end' function to signal that you are done inputting file/s. -Listen to the 'data' for the returned informtion 'multihash, size and path' for -each file added. Listen to the 'end' event from the stream to know when the -importer has finished importing files. Input file paths with directory structure -will preserve the hierarchy in the dag node. +The importer is a object Transform stream that accepts objects of the form + +```js +{ + path: 'a name', + content: (Buffer or Readable stream) +} +``` + +The stream will output IPFS DAG Node stats for the nodes it as added to the DAG +Service. When stats on a node are emitted they are guaranteed to have been +written into the DAG Service's storage mechanism. + +The input's file paths and directory structure will be preserved in the DAG +Nodes. -Uses the [DAG Service](https://github.com/vijayee/js-ipfs-merkle-dag/) instance -`dagService`. ## Example Exporter @@ -133,15 +139,25 @@ exportEvent.on('data', (result) => { } ``` -##API +## Exporter: API ```js -const Importer = require('ipfs-unixfs-engine').exporter +const Exporter = require('ipfs-unixfs-engine').exporter ``` -The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service. +The exporter is a readable stream in object mode that outputs objects of the +form +```js +{ + path: 'a name', + content: (Buffer or Readable stream) +} +``` + +by the multihash of the file from the DAG Service. -## install + +## Install With [npm](https://npmjs.org/) installed, run @@ -149,6 +165,9 @@ With [npm](https://npmjs.org/) installed, run $ npm install ipfs-unixfs-engine ``` -## license +## License ISC + + +[IPFS DAG Service]: https://github.com/vijayee/js-ipfs-merkle-dag/ diff --git a/package.json b/package.json index 1d3587e9..1f6445b9 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "ipfs-unixfs-engine", "version": "0.8.0", "description": "JavaScript implementation of the unixfs Engine used by IPFS", - "main": "lib/index.js", + "main": "src/index.js", "jsnext:main": "src/index.js", "scripts": { "lint": "aegir-lint", @@ -56,8 +56,10 @@ "debug": "^2.2.0", "ipfs-merkle-dag": "^0.5.0", "ipfs-unixfs": "^0.1.0", + "isstream": "^0.1.2", "readable-stream": "^1.1.13", "run-series": "^1.1.4", + "streamifier": "^0.1.1", "through2": "^2.0.0" }, "contributors": [ @@ -68,4 +70,4 @@ "greenkeeperio-bot ", "nginnever " ] -} \ No newline at end of file +} diff --git a/src/exporter.js b/src/exporter.js index dacff549..3bfaaab0 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -42,7 +42,7 @@ function Exporter (hash, dagService, options) { rs.push(unmarshaledData.data) rs.push(null) } - this.push({ stream: rs, path: name }) + this.push({ content: rs, path: name }) callback() return } else { @@ -75,7 +75,7 @@ function Exporter (hash, dagService, options) { return }) } - this.push({ stream: rs, path: name }) + this.push({ content: rs, path: name }) callback() return } @@ -97,7 +97,7 @@ function Exporter (hash, dagService, options) { rs.push(node.data) rs.push(null) } - this.push({stream: null, path: name}) + this.push({content: null, path: name}) callback() return } else { diff --git a/src/importer.js b/src/importer.js index a5506607..aede8925 100644 --- a/src/importer.js +++ b/src/importer.js @@ -10,6 +10,8 @@ const UnixFS = require('ipfs-unixfs') const util = require('util') const bs58 = require('bs58') const Duplex = require('readable-stream').Duplex +const isStream = require('isstream') +const streamifier = require('streamifier') exports = module.exports = Importer @@ -36,7 +38,7 @@ function Importer (dagService, options) { this._write = (fl, enc, next) => { this.read() counter++ - if (!fl.stream) { + if (!fl.content) { // 1. create the empty dir dag node // 2. write it to the dag store // 3. add to the files array {path: <>, hash: <>} @@ -63,8 +65,20 @@ function Importer (dagService, options) { return } + // Convert a buffer to a readable stream + if (Buffer.isBuffer(fl.content)) { + const r = streamifier.createReadStream(fl.content) + fl.content = r + } + + // Bail if 'content' is not readable + if (!isStream.isReadable(fl.content)) { + this.emit('error', new Error('"content" is not a Buffer nor Readable stream')) + return + } + const leaves = [] - fl.stream + fl.content .pipe(fsc(CHUNK_SIZE)) .pipe(through2((chunk, enc, cb) => { // 1. create the unixfs merkledag node @@ -224,13 +238,15 @@ function Importer (dagService, options) { // If the value is not an object // add as a link to the dirNode - function traverse (tree, base) { + let pendingWrites = 0 + + function traverse (tree, path, done) { const keys = Object.keys(tree) let tmpTree = tree keys.map((key) => { if (typeof tmpTree[key] === 'object' && !Buffer.isBuffer(tmpTree[key])) { - tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key) + tmpTree[key] = traverse.call(this, tmpTree[key], path ? path + '/' + key : key, done) } }) @@ -250,28 +266,39 @@ function Importer (dagService, options) { }) n.data = d.marshal() + + pendingWrites++ dagService.add(n, (err) => { + pendingWrites-- if (err) { this.push({error: 'failed to store dirNode'}) + } else if (path) { + const el = { + path: path, + multihash: n.multihash(), + yes: 'no', + size: n.size() + } + this.push(el) + } + + if (pendingWrites <= 0) { + done() } }) - if (!base) { + if (!path) { return } - const el = { - path: base, - multihash: n.multihash(), - size: n.size() - } - this.push(el) - mhIndex[bs58.encode(n.multihash())] = { size: n.size() } return n.multihash() } - /* const rootHash = */ traverse.call(this, fileTree) - this.push(null) + + let self = this + /* const rootHash = */ traverse.call(this, fileTree, null, function () { + self.push(null) + }) } } } diff --git a/test/test-exporter.js b/test/test-exporter.js index 91346eaf..8e4f79db 100644 --- a/test/test-exporter.js +++ b/test/test-exporter.js @@ -33,7 +33,7 @@ module.exports = function (repo) { expect(err).to.not.exist const testExport = exporter(hash, ds) testExport.on('data', (file) => { - file.stream.pipe(bl((err, bldata) => { + file.content.pipe(bl((err, bldata) => { expect(err).to.not.exist expect(bldata).to.deep.equal(unmarsh.data) done() @@ -48,7 +48,7 @@ module.exports = function (repo) { const ds = new DAGService(bs) const testExport = exporter(hash, ds) testExport.on('data', (file) => { - file.stream.pipe(bl((err, bldata) => { + file.content.pipe(bl((err, bldata) => { expect(bldata).to.deep.equal(bigFile) expect(err).to.not.exist done() @@ -63,7 +63,7 @@ module.exports = function (repo) { const testExport = exporter(hash, ds) testExport.on('data', (file) => { expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE') - file.stream.pipe(bl((err, bldata) => { + file.content.pipe(bl((err, bldata) => { expect(err).to.not.exist done() })) @@ -94,7 +94,7 @@ module.exports = function (repo) { const ds = new DAGService(bs) const testExport = exporter(hash, ds) testExport.on('data', (dir) => { - expect(dir.stream).to.equal(null) + expect(dir.content).to.equal(null) done() }) }) diff --git a/test/test-importer.js b/test/test-importer.js index edf77f03..fea1c465 100644 --- a/test/test-importer.js +++ b/test/test-importer.js @@ -29,6 +29,17 @@ module.exports = function (repo) { done() }) + it('bad input', (done) => { + const r = 'banana' + const i = new Importer(ds) + i.on('error', (err) => { + expect(err).to.exist + done() + }) + i.write({path: '200Bytes.txt', content: r}) + i.end() + }) + it('small file (smaller than a chunk)', (done) => { const buffered = smallFile const r = streamifier.createReadStream(buffered) @@ -38,7 +49,22 @@ module.exports = function (repo) { expect(bs58.encode(obj.multihash)).to.equal('QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8') expect(obj.size).to.equal(211) }) - i.write({path: '200Bytes.txt', stream: r}) + i.write({path: '200Bytes.txt', content: r}) + i.end() + i.on('end', () => { + done() + }) + }) + + it('small file as buffer (smaller than a chunk)', (done) => { + const buffered = smallFile + const i = new Importer(ds) + i.on('data', (obj) => { + expect(obj.path).to.equal('200Bytes.txt') + expect(bs58.encode(obj.multihash)).to.equal('QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8') + expect(obj.size).to.equal(211) + }) + i.write({path: '200Bytes.txt', content: buffered}) i.end() i.on('end', () => { done() @@ -69,7 +95,7 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'foo/bar/200Bytes.txt', stream: r}) + i.write({path: 'foo/bar/200Bytes.txt', content: r}) i.end() }) @@ -85,7 +111,7 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: '1.2MiB.txt', stream: r}) + i.write({path: '1.2MiB.txt', content: r}) i.end() }) @@ -106,7 +132,7 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'foo-big/1.2MiB.txt', stream: r}) + i.write({path: 'foo-big/1.2MiB.txt', content: r}) i.end() }) @@ -156,8 +182,8 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'pim/200Bytes.txt', stream: r1}) - i.write({path: 'pim/1.2MiB.txt', stream: r2}) + i.write({path: 'pim/200Bytes.txt', content: r1}) + i.write({path: 'pim/1.2MiB.txt', content: r2}) i.end() }) @@ -195,9 +221,9 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'pam/pum/200Bytes.txt', stream: r1}) - i.write({path: 'pam/pum/1.2MiB.txt', stream: r2}) - i.write({path: 'pam/1.2MiB.txt', stream: r3}) + i.write({path: 'pam/pum/200Bytes.txt', content: r1}) + i.write({path: 'pam/pum/1.2MiB.txt', content: r2}) + i.write({path: 'pam/1.2MiB.txt', content: r3}) i.end() }) })