Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Builder refactoring, trickle builder and balanced builder #118

Merged
merged 30 commits into from
Jan 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
511c746
builder refactoring. trickle builder. balanced builder
pgte Dec 28, 2016
0e3f158
removed unused experimental builder
pgte Dec 28, 2016
6af7458
documented importer options
pgte Dec 28, 2016
977f41b
default builder strategy is now the balanced strategy
pgte Dec 28, 2016
c767848
removed unused test
pgte Dec 28, 2016
7854682
removed superfluous comment
pgte Dec 28, 2016
df48f69
fixed trickle builder
pgte Dec 29, 2016
50c5d35
removed superfluous comment
pgte Dec 29, 2016
cbe2ce4
using options.chunkerOptions for chunker-specific options
pgte Dec 29, 2016
f8b9e80
docs: corrected option name
pgte Dec 29, 2016
fedfc30
fix: error handling in trickle reducer
pgte Dec 29, 2016
8e8d3d6
using pull-pair instead of backpressure-less bespoke pair
pgte Dec 30, 2016
7647657
fixed trickle builder tests
pgte Jan 3, 2017
74482f3
recursive streaming trickle builder
pgte Jan 3, 2017
2b92345
missing dep
pgte Jan 3, 2017
01d8583
some style corrections
pgte Jan 3, 2017
0036314
importing multiple roots yields an error
pgte Jan 3, 2017
02cdefd
reinstated testing importing using flat and balanced strategies
pgte Jan 3, 2017
8ac163c
asserting that root node is one and only one
pgte Jan 3, 2017
e723586
testing import and export using various builder strategies
pgte Jan 3, 2017
b9a01f8
fixed error propagation into push streams
pgte Jan 3, 2017
03f49d4
simplified some iteration logic
pgte Jan 3, 2017
fedbe5f
default for maximum children pre node is 174
pgte Jan 7, 2017
180b808
by default, only reduces one leaf to self if specific option is present
pgte Jan 8, 2017
937c292
test results reflect new default config
pgte Jan 8, 2017
0f706df
testing against big files genearted from a pseudo random byte stream gen
pgte Jan 9, 2017
0d3602e
added missing dep
pgte Jan 9, 2017
973c483
removed unnecessary dev dependency
pgte Jan 10, 2017
67fbf87
go-ipfs parity: no root node with single leaf
pgte Jan 10, 2017
ff6cce5
docs: corrected the default maximum number of children nodes
pgte Jan 10, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ been written into the [DAG Service][]'s storage mechanism.
The input's file paths and directory structure will be preserved in the DAG
Nodes.

### Importer options

In the second argument of the importer constructor you can specify the following options:

* `chunker` (string, defaults to `"fixed"`): the chunking strategy. Now only supports `"fixed"`
* `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties:
* `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size for the `fixed` chunker.
* `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports:
* `flat`: flat list of chunks
* `balanced`: builds a balanced tree
* `trickle`: builds [a trickle tree](https://github.com/ipfs/specs/pull/57#issuecomment-265205384)
* `maxChildrenPerNode` (positive integer, defaults to `174`): the maximum children per node for the `balanced` and `trickle` DAG builder strategies
* `layerRepeat` (positive integer, defaults to 4): (only applicable to the `trickle` DAG builder strategy). The maximum repetition of parent nodes for each layer of the tree.
* `reduceSingleLeafToSelf` (boolean, defaults to `false`): optimization for, when reducing a set of nodes with one node, reduce it to that node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌🏽


### Example Exporter

Expand Down
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,24 @@
"ipfs-repo": "^0.11.2",
"ncp": "^2.0.0",
"pre-commit": "^1.2.2",
"pull-generate": "^2.2.0",
"pull-zip": "^2.0.1",
"rimraf": "^2.5.4"
},
"dependencies": {
"async": "^2.1.4",
"cids": "^0.3.5",
"deep-extend": "^0.4.1",
"ipfs-unixfs": "^0.1.9",
"ipld-dag-pb": "^0.9.3",
"ipld-resolver": "^0.4.1",
"is-ipfs": "^0.2.1",
"multihashes": "^0.3.1",
"pull-batch": "^1.0.0",
"pull-block": "^1.0.2",
"pull-pair": "^1.1.0",
"pull-paramap": "^1.2.1",
"pull-pause": "0.0.0",
"pull-pushable": "^2.0.1",
"pull-stream": "^3.5.0",
"pull-traverse": "^1.0.3",
Expand All @@ -76,4 +81,4 @@
"jbenet <[email protected]>",
"nginnever <[email protected]>"
]
}
}
53 changes: 53 additions & 0 deletions src/builder/balanced/balanced-reducer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'

const assert = require('assert')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const pullPair = require('pull-pair')
const batch = require('pull-batch')

module.exports = function balancedReduceToRoot (reduce, options) {
const pair = pullPair()
const source = pair.source

const result = pushable()

reduceToParents(source, (err, roots) => {
if (err) {
result.end(err)
return // early
}
assert.equal(roots.length, 1, 'need one root')
result.push(roots[0])
result.end()
})

function reduceToParents (_chunks, callback) {
let chunks = _chunks
if (Array.isArray(chunks)) {
chunks = pull.values(chunks)
}

pull(
chunks,
batch(options.maxChildrenPerNode),
pull.asyncMap(reduce),
pull.collect(reduced)
)

function reduced (err, roots) {
if (err) {
callback(err)
} else if (roots.length > 1) {
reduceToParents(roots, callback)
} else {
callback(null, roots)
}
}
}

return {
sink: pair.sink,
source: result
}
}
12 changes: 12 additions & 0 deletions src/builder/balanced/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict'

const balancedReducer = require('./balanced-reducer')

const defaultOptions = {
maxChildrenPerNode: 174
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment with a link to https://github.com/ipfs/go-ipfs/blob/master/importer/helpers/helpers.go#L16-L35 so that we remember where this value comes from


module.exports = function (reduce, _options) {
const options = Object.assign({}, defaultOptions, _options)
return balancedReducer(reduce, options)
}
150 changes: 150 additions & 0 deletions src/builder/builder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
'use strict'

const extend = require('deep-extend')
const assert = require('assert')
const UnixFS = require('ipfs-unixfs')
const pull = require('pull-stream')
const through = require('pull-through')
const parallel = require('async/parallel')
const waterfall = require('async/waterfall')
const dagPB = require('ipld-dag-pb')
const CID = require('cids')

const reduce = require('./reduce')

const DAGNode = dagPB.DAGNode

const defaultOptions = {
chunkerOptions: {
maxChunkSize: 262144
}
}

module.exports = function (createChunker, ipldResolver, createReducer, _options) {
const options = extend({}, defaultOptions, _options)

return function (source, files) {
return function (items, cb) {
parallel(items.map((item) => (cb) => {
if (!item.content) {
// item is a directory
return createAndStoreDir(item, (err, node) => {
if (err) {
return cb(err)
}
source.push(node)
files.push(node)
cb()
})
}

// item is a file
createAndStoreFile(item, (err, node) => {
if (err) {
return cb(err)
}
source.push(node)
files.push(node)
cb()
})
}), cb)
}
}

function createAndStoreDir (item, callback) {
// 1. create the empty dir dag node
// 2. write it to the dag store

const d = new UnixFS('directory')
waterfall([
(cb) => DAGNode.create(d.marshal(), cb),
(node, cb) => {
ipldResolver.put({
node: node,
cid: new CID(node.multihash)
}, (err) => cb(err, node))
}
], (err, node) => {
if (err) {
return callback(err)
}
callback(null, {
path: item.path,
multihash: node.multihash,
size: node.size
})
})
}

function createAndStoreFile (file, callback) {
if (Buffer.isBuffer(file.content)) {
file.content = pull.values([file.content])
}

if (typeof file.content !== 'function') {
return callback(new Error('invalid content'))
}

const reducer = createReducer(reduce(file, ipldResolver, options), options)

let previous
let count = 0

pull(
file.content,
createChunker(options.chunkerOptions),
pull.map(chunk => new Buffer(chunk)),
pull.map(buffer => new UnixFS('file', buffer)),
pull.asyncMap((fileNode, callback) => {
DAGNode.create(fileNode.marshal(), (err, node) => {
callback(err, { DAGNode: node, fileNode: fileNode })
})
}),
pull.asyncMap((leaf, callback) => {
ipldResolver.put(
{
node: leaf.DAGNode,
cid: new CID(leaf.DAGNode.multihash)
},
err => callback(err, leaf)
)
}),
pull.map((leaf) => {
return {
path: file.path,
multihash: leaf.DAGNode.multihash,
size: leaf.DAGNode.size,
leafSize: leaf.fileNode.fileSize(),
name: ''
}
}),
through( // mark as single node if only one single node
function onData (data) {
count++
if (previous) {
this.queue(previous)
}
previous = data
},
function ended () {
if (previous) {
if (count === 1) {
previous.single = true
}
this.queue(previous)
}
this.queue(null)
}
),
reducer,
pull.collect((err, roots) => {
if (err) {
callback(err)
} else {
assert.equal(roots.length, 1, 'should result in exactly one root')
callback(null, roots[0])
}
})
)
}
}
29 changes: 29 additions & 0 deletions src/builder/create-build-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict'

const pullPushable = require('pull-pushable')
const pullWrite = require('pull-write')

module.exports = function createBuildStream (createStrategy, ipldResolver, flushTree, options) {
const files = []

const source = pullPushable()

const sink = pullWrite(
createStrategy(source, files),
null,
options.highWaterMark,
(err) => {
if (err) {
source.end(err)
return // early
}

flushTree(files, ipldResolver, source, source.end)
}
)

return {
source: source,
sink: sink
}
}
33 changes: 33 additions & 0 deletions src/builder/flat/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const assert = require('assert')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const pullPair = require('pull-pair')
const batch = require('pull-batch')

module.exports = function (reduce, options) {
const pair = pullPair()
const source = pair.source
const result = pushable()

pull(
source,
batch(Infinity),
pull.asyncMap(reduce),
pull.collect((err, roots) => {
if (err) {
result.end(err)
return // early
}
assert.equal(roots.length, 1, 'need one root')
result.push(roots[0])
result.end()
})
)

return {
sink: pair.sink,
source: result
}
}
33 changes: 33 additions & 0 deletions src/builder/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const assert = require('assert')
const createBuildStream = require('./create-build-stream')
const Builder = require('./builder')

const reducers = {
flat: require('./flat'),
balanced: require('./balanced'),
trickle: require('./trickle')
}

const defaultOptions = {
strategy: 'balanced',
highWaterMark: 100,
reduceSingleLeafToSelf: false
}

module.exports = function (Chunker, ipldResolver, flushTree, _options) {
assert(Chunker, 'Missing chunker creator function')
assert(ipldResolver, 'Missing IPLD Resolver')
assert(flushTree, 'Missing flushTree argument')

const options = Object.assign({}, defaultOptions, _options)

const strategyName = options.strategy
const reducer = reducers[strategyName]
assert(reducer, 'Unknown importer build strategy name: ' + strategyName)

const createStrategy = Builder(Chunker, ipldResolver, reducer, options)

return createBuildStream(createStrategy, ipldResolver, flushTree, options)
}
Loading