Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Rename "stream" to "content" in tuples. #43

Merged
merged 6 commits into from
May 26, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ const res = []

const rs = fs.createReadStream(file)
const rs2 = fs.createReadStream(file2)
const input = {path: /tmp/foo/bar, stream: rs}
const input2 = {path: /tmp/foo/quxx, stream: rs2}
const input = {path: /tmp/foo/bar, content: rs}
const input2 = {path: /tmp/foo/quxx, content: rs2}

// Listen for the data event from the importer stream

Expand Down Expand Up @@ -138,7 +138,7 @@ exportEvent.on('data', (result) => {
const Importer = require('ipfs-unixfs-engine').exporter
Copy link
Contributor

@daviddias daviddias May 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const Exporter = require('ipfs-unixfs-engine').exporter

```

The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service.
The exporter is a readable stream in object mode that returns an object ```{ content: stream, path: 'path' }``` by the multihash of the file from the dag service.


## install
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
"debug": "^2.2.0",
"ipfs-merkle-dag": "^0.5.0",
"ipfs-unixfs": "^0.1.0",
"isstream": "^0.1.2",
"readable-stream": "^1.1.13",
"run-series": "^1.1.4",
"streamifier": "^0.1.1",
"through2": "^2.0.0"
},
"contributors": [
Expand Down
6 changes: 3 additions & 3 deletions src/exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function Exporter (hash, dagService, options) {
rs.push(unmarshaledData.data)
rs.push(null)
}
this.push({ stream: rs, path: name })
this.push({ content: rs, path: name })
callback()
return
} else {
Expand Down Expand Up @@ -75,7 +75,7 @@ function Exporter (hash, dagService, options) {
return
})
}
this.push({ stream: rs, path: name })
this.push({ content: rs, path: name })
callback()
return
}
Expand All @@ -97,7 +97,7 @@ function Exporter (hash, dagService, options) {
rs.push(node.data)
rs.push(null)
}
this.push({stream: null, path: name})
this.push({content: null, path: name})
callback()
return
} else {
Expand Down
53 changes: 40 additions & 13 deletions src/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const UnixFS = require('ipfs-unixfs')
const util = require('util')
const bs58 = require('bs58')
const Duplex = require('readable-stream').Duplex
const isStream = require('isstream')
const streamifier = require('streamifier')

exports = module.exports = Importer

Expand All @@ -36,7 +38,7 @@ function Importer (dagService, options) {
this._write = (fl, enc, next) => {
this.read()
counter++
if (!fl.stream) {
if (!fl.content) {
// 1. create the empty dir dag node
// 2. write it to the dag store
// 3. add to the files array {path: <>, hash: <>}
Expand All @@ -63,8 +65,20 @@ function Importer (dagService, options) {
return
}

// Convert a buffer to a readable stream
if (Buffer.isBuffer(fl.content)) {
const r = streamifier.createReadStream(fl.content)
fl.content = r
}

// Bail if 'content' is not readable
if (!isStream.isReadable(fl.content)) {
this.emit('error', new Error('"content" is not a Buffer nor Readable stream'))
return
}

const leaves = []
fl.stream
fl.content
.pipe(fsc(CHUNK_SIZE))
.pipe(through2((chunk, enc, cb) => {
// 1. create the unixfs merkledag node
Expand Down Expand Up @@ -224,13 +238,15 @@ function Importer (dagService, options) {
// If the value is not an object
// add as a link to the dirNode

function traverse (tree, base) {
let pendingWrites = 0

function traverse (tree, base, done) {
const keys = Object.keys(tree)
let tmpTree = tree
keys.map((key) => {
if (typeof tmpTree[key] === 'object' &&
!Buffer.isBuffer(tmpTree[key])) {
tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key)
tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key, done)
}
})

Expand All @@ -250,28 +266,39 @@ function Importer (dagService, options) {
})

n.data = d.marshal()

pendingWrites++
dagService.add(n, (err) => {
pendingWrites--
if (err) {
this.push({error: 'failed to store dirNode'})
} else if (base) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this base ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const el = {
path: base,
multihash: n.multihash(),
yes: 'no',
size: n.size()
}
this.push(el)
}

if (pendingWrites <= 0) {
done()
}
})

if (!base) {
return
}

const el = {
path: base,
multihash: n.multihash(),
size: n.size()
}
this.push(el)

mhIndex[bs58.encode(n.multihash())] = { size: n.size() }
return n.multihash()
}
/* const rootHash = */ traverse.call(this, fileTree)
this.push(null)

let self = this
/* const rootHash = */ traverse.call(this, fileTree, null, function () {
self.push(null)
})
}
}
}
8 changes: 4 additions & 4 deletions test/test-exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module.exports = function (repo) {
expect(err).to.not.exist
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata).to.deep.equal(unmarsh.data)
done()
Expand All @@ -48,7 +48,7 @@ module.exports = function (repo) {
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
file.content.pipe(bl((err, bldata) => {
expect(bldata).to.deep.equal(bigFile)
expect(err).to.not.exist
done()
Expand All @@ -63,7 +63,7 @@ module.exports = function (repo) {
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
file.stream.pipe(bl((err, bldata) => {
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
done()
}))
Expand Down Expand Up @@ -94,7 +94,7 @@ module.exports = function (repo) {
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('data', (dir) => {
expect(dir.stream).to.equal(null)
expect(dir.content).to.equal(null)
done()
})
})
Expand Down
44 changes: 35 additions & 9 deletions test/test-importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ module.exports = function (repo) {
done()
})

it('bad input', (done) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about a good banana (inside a buffer) ? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const r = 'banana'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const i = new Importer(ds)
i.on('error', (err) => {
expect(err).to.exist
done()
})
i.write({path: '200Bytes.txt', content: r})
i.end()
})

it('small file (smaller than a chunk)', (done) => {
const buffered = smallFile
const r = streamifier.createReadStream(buffered)
Expand All @@ -38,7 +49,22 @@ module.exports = function (repo) {
expect(bs58.encode(obj.multihash)).to.equal('QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8')
expect(obj.size).to.equal(211)
})
i.write({path: '200Bytes.txt', stream: r})
i.write({path: '200Bytes.txt', content: r})
i.end()
i.on('end', () => {
done()
})
})

it('small file as buffer (smaller than a chunk)', (done) => {
const buffered = smallFile
const i = new Importer(ds)
i.on('data', (obj) => {
expect(obj.path).to.equal('200Bytes.txt')
expect(bs58.encode(obj.multihash)).to.equal('QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8')
expect(obj.size).to.equal(211)
})
i.write({path: '200Bytes.txt', content: buffered})
i.end()
i.on('end', () => {
done()
Expand Down Expand Up @@ -69,7 +95,7 @@ module.exports = function (repo) {
i.on('end', () => {
done()
})
i.write({path: 'foo/bar/200Bytes.txt', stream: r})
i.write({path: 'foo/bar/200Bytes.txt', content: r})
i.end()
})

Expand All @@ -85,7 +111,7 @@ module.exports = function (repo) {
i.on('end', () => {
done()
})
i.write({path: '1.2MiB.txt', stream: r})
i.write({path: '1.2MiB.txt', content: r})
i.end()
})

Expand All @@ -106,7 +132,7 @@ module.exports = function (repo) {
i.on('end', () => {
done()
})
i.write({path: 'foo-big/1.2MiB.txt', stream: r})
i.write({path: 'foo-big/1.2MiB.txt', content: r})
i.end()
})

Expand Down Expand Up @@ -156,8 +182,8 @@ module.exports = function (repo) {
i.on('end', () => {
done()
})
i.write({path: 'pim/200Bytes.txt', stream: r1})
i.write({path: 'pim/1.2MiB.txt', stream: r2})
i.write({path: 'pim/200Bytes.txt', content: r1})
i.write({path: 'pim/1.2MiB.txt', content: r2})
i.end()
})

Expand Down Expand Up @@ -195,9 +221,9 @@ module.exports = function (repo) {
i.on('end', () => {
done()
})
i.write({path: 'pam/pum/200Bytes.txt', stream: r1})
i.write({path: 'pam/pum/1.2MiB.txt', stream: r2})
i.write({path: 'pam/1.2MiB.txt', stream: r3})
i.write({path: 'pam/pum/200Bytes.txt', content: r1})
i.write({path: 'pam/pum/1.2MiB.txt', content: r2})
i.write({path: 'pam/1.2MiB.txt', content: r3})
i.end()
})
})
Expand Down