Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

Commit 0bff5f2

Browse files
authored
feat: allow overriding of internal functions (#48)
* feat: allow overriding of internal functions The interesting bits of this module are the various ways of building DAGs for files and folders (flat, sharded, etc). Sometimes we want to utilise bits of this logic to build DAGs without having to reimplement big chunks of this module. This PR allows the user to pass in functions to replace key parts of this import pipeline. Enables: * ipfs-inactive/js-ipfs-mfs#73 * #46 * docs: add description about internal funcs
1 parent 2ddf00d commit 0bff5f2

File tree

10 files changed

+266
-60
lines changed

10 files changed

+266
-60
lines changed

README.md

+25
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
- [Example](#example)
2525
- [API](#api)
2626
- [const import = importer(source, ipld [, options])](#const-import--importersource-ipld--options)
27+
- [Overriding internals](#overriding-internals)
2728
- [Contribute](#contribute)
2829
- [License](#license)
2930

@@ -145,8 +146,32 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
145146
- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50).
146147
- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50).
147148

149+
## Overriding internals
150+
151+
Several aspects of the importer are overridable by specifying functions as part of the options object with these keys:
152+
153+
- `chunkValidator` (function): Optional function that supports the signature `async function * (source, options)`
154+
- This function takes input from the `content` field of imported entries. It should transform them into `Buffer`s, throwing an error if it cannot.
155+
- It should yield `Buffer` objects constructed from the `source` or throw an `Error`
156+
- `chunker` (function): Optional function that supports the signature `async function * (source, options)` where `source` is an async generator and `options` is an options object
157+
- It should yield `Buffer` objects.
158+
- `bufferImporter` (function): Optional function that supports the signature `async function * (entry, source, ipld, options)`
159+
- This function should read `Buffer`s from `source` and persist them using `ipld.put` or similar
160+
- `entry` is the `{ path, content }` entry, `source` is an async generator that yields Buffers
161+
- It should yield functions that return a Promise that resolves to an object with the properties `{ cid, unixfs, size }` where `cid` is a [CID], `unixfs` is a [UnixFS] entry and `size` is a `Number` that represents the serialized size of the [IPLD] node that holds the buffer data.
162+
- Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `blockWriteConcurrency` option (default: 10)
163+
- `dagBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)`
164+
- This function should read `{ path, content }` entries from `source` and turn them into DAGs
165+
- It should yield a `function` that returns a `Promise` that resolves to `{ cid, path, unixfs, node }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `node` is a `DAGNode`.
166+
- Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `fileImportConcurrency` option (default: 50)
167+
- `treeBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)`
168+
- This function should read `{ cid, path, unixfs, node }` entries from `source` and place them in a directory structure
169+
- It should yield an object with the properties `{ cid, path, unixfs, size }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `size` is a `Number`.
170+
148171
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
149172
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
173+
[IPLD]: https://github.com/ipld/js-ipld
174+
[CID]: https://github.com/multiformats/js-cid
150175

151176
## Contribute
152177

src/dag-builder/dir.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ const dirBuilder = async (item, ipld, options) => {
2121
cid,
2222
path,
2323
unixfs,
24-
node
24+
size: node.size
2525
}
2626
}
2727

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
'use strict'
2+
3+
const UnixFS = require('ipfs-unixfs')
4+
const persist = require('../../utils/persist')
5+
const {
6+
DAGNode
7+
} = require('ipld-dag-pb')
8+
9+
async function * bufferImporter (file, source, ipld, options) {
10+
for await (const buffer of source) {
11+
yield async () => {
12+
options.progress(buffer.length)
13+
let node
14+
let unixfs
15+
let size
16+
17+
const opts = {
18+
...options
19+
}
20+
21+
if (options.rawLeaves) {
22+
node = buffer
23+
size = buffer.length
24+
25+
opts.codec = 'raw'
26+
opts.cidVersion = 1
27+
} else {
28+
unixfs = new UnixFS({
29+
type: options.leafType,
30+
data: buffer,
31+
mtime: file.mtime,
32+
mode: file.mode
33+
})
34+
35+
node = new DAGNode(unixfs.marshal())
36+
size = node.size
37+
}
38+
39+
const cid = await persist(node, ipld, opts)
40+
41+
return {
42+
cid: cid,
43+
unixfs,
44+
size
45+
}
46+
}
47+
}
48+
}
49+
50+
module.exports = bufferImporter

src/dag-builder/file/index.js

+13-46
Original file line numberDiff line numberDiff line change
@@ -16,49 +16,18 @@ const dagBuilders = {
1616
trickle: require('./trickle')
1717
}
1818

19-
async function * importBuffer (file, source, ipld, options) {
20-
for await (const buffer of source) {
21-
yield async () => {
22-
options.progress(buffer.length)
23-
let node
24-
let unixfs
25-
26-
const opts = {
27-
...options
28-
}
29-
30-
if (options.rawLeaves) {
31-
node = buffer
32-
33-
opts.codec = 'raw'
34-
opts.cidVersion = 1
35-
} else {
36-
unixfs = new UnixFS({
37-
type: options.leafType,
38-
data: buffer,
39-
mtime: file.mtime,
40-
mode: file.mode
41-
})
42-
43-
node = new DAGNode(unixfs.marshal())
44-
}
45-
46-
const cid = await persist(node, ipld, opts)
47-
48-
return {
49-
cid: cid,
50-
unixfs,
51-
node
52-
}
53-
}
54-
}
55-
}
56-
5719
async function * buildFileBatch (file, source, ipld, options) {
5820
let count = -1
5921
let previous
22+
let bufferImporter
23+
24+
if (typeof options.bufferImporter === 'function') {
25+
bufferImporter = options.bufferImporter
26+
} else {
27+
bufferImporter = require('./buffer-importer')
28+
}
6029

61-
for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) {
30+
for await (const entry of parallelBatch(bufferImporter(file, source, ipld, options), options.blockWriteConcurrency)) {
6231
count++
6332

6433
if (count === 0) {
@@ -86,9 +55,8 @@ const reduce = (file, ipld, options) => {
8655
return {
8756
cid: leaf.cid,
8857
path: file.path,
89-
name: (file.path || '').split('/').pop(),
9058
unixfs: leaf.unixfs,
91-
node: leaf.node
59+
size: leaf.size
9260
}
9361
}
9462

@@ -101,7 +69,7 @@ const reduce = (file, ipld, options) => {
10169

10270
const links = leaves
10371
.filter(leaf => {
104-
if (leaf.cid.codec === 'raw' && leaf.node.length) {
72+
if (leaf.cid.codec === 'raw' && leaf.size) {
10573
return true
10674
}
10775

@@ -114,9 +82,9 @@ const reduce = (file, ipld, options) => {
11482
.map((leaf) => {
11583
if (leaf.cid.codec === 'raw') {
11684
// node is a leaf buffer
117-
f.addBlockSize(leaf.node.length)
85+
f.addBlockSize(leaf.size)
11886

119-
return new DAGLink(leaf.name, leaf.node.length, leaf.cid)
87+
return new DAGLink(leaf.name, leaf.size, leaf.cid)
12088
}
12189

12290
if (!leaf.unixfs.data) {
@@ -127,7 +95,7 @@ const reduce = (file, ipld, options) => {
12795
f.addBlockSize(leaf.unixfs.data.length)
12896
}
12997

130-
return new DAGLink(leaf.name, leaf.node.size, leaf.cid)
98+
return new DAGLink(leaf.name, leaf.size, leaf.cid)
13199
})
132100

133101
const node = new DAGNode(f.marshal(), links)
@@ -137,7 +105,6 @@ const reduce = (file, ipld, options) => {
137105
cid,
138106
path: file.path,
139107
unixfs: f,
140-
node,
141108
size: node.size
142109
}
143110
}

src/dag-builder/index.js

+18-4
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
const dirBuilder = require('./dir')
44
const fileBuilder = require('./file')
5-
const createChunker = require('../chunker')
6-
const validateChunks = require('./validate-chunks')
75

86
async function * dagBuilder (source, ipld, options) {
97
for await (const entry of source) {
@@ -30,10 +28,26 @@ async function * dagBuilder (source, ipld, options) {
3028
}
3129
}
3230

33-
const chunker = createChunker(options.chunker, validateChunks(source), options)
31+
let chunker
32+
33+
if (typeof options.chunker === 'function') {
34+
chunker = options.chunker
35+
} else if (options.chunker === 'rabin') {
36+
chunker = require('../chunker/rabin')
37+
} else {
38+
chunker = require('../chunker/fixed-size')
39+
}
40+
41+
let chunkValidator
42+
43+
if (typeof options.chunkValidator === 'function') {
44+
chunkValidator = options.chunkValidator
45+
} else {
46+
chunkValidator = require('./validate-chunks')
47+
}
3448

3549
// item is a file
36-
yield () => fileBuilder(entry, chunker, ipld, options)
50+
yield () => fileBuilder(entry, chunker(chunkValidator(source, options), options), ipld, options)
3751
} else {
3852
// item is a directory
3953
yield () => dirBuilder(entry, ipld, options)

src/dir-flat.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class DirFlat extends Dir {
6565
}
6666
}
6767

68-
links.push(new DAGLink(children[i], child.node.length || child.node.size, child.cid))
68+
links.push(new DAGLink(children[i], child.size, child.cid))
6969
}
7070

7171
const unixfs = new UnixFS({
@@ -84,7 +84,7 @@ class DirFlat extends Dir {
8484
cid,
8585
unixfs,
8686
path,
87-
node
87+
size: node.size
8888
}
8989
}
9090
}

src/dir-sharded.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) {
107107
shard = subShard
108108
}
109109

110-
links.push(new DAGLink(labelPrefix, shard.node.size, shard.cid))
110+
links.push(new DAGLink(labelPrefix, shard.size, shard.cid))
111111
} else if (typeof child.value.flush === 'function') {
112112
const dir = child.value
113113
let flushedDir
@@ -119,7 +119,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) {
119119
}
120120

121121
const label = labelPrefix + child.key
122-
links.push(new DAGLink(label, flushedDir.node.size, flushedDir.cid))
122+
links.push(new DAGLink(label, flushedDir.size, flushedDir.cid))
123123
} else {
124124
const value = child.value
125125

@@ -155,8 +155,8 @@ async function * flush (path, bucket, ipld, shardRoot, options) {
155155

156156
yield {
157157
cid,
158-
node,
159158
unixfs: dir,
160-
path
159+
path,
160+
size: node.size
161161
}
162162
}

src/index.js

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
'use strict'
22

3-
const dagBuilder = require('./dag-builder')
4-
const treeBuilder = require('./tree-builder')
53
const parallelBatch = require('it-parallel-batch')
64
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })
75

@@ -30,7 +28,9 @@ const defaultOptions = {
3028
pin: true,
3129
recursive: false,
3230
hidden: false,
33-
preload: true
31+
preload: true,
32+
chunkValidator: null,
33+
importBuffer: null
3434
}
3535

3636
module.exports = async function * (source, ipld, options = {}) {
@@ -58,6 +58,22 @@ module.exports = async function * (source, ipld, options = {}) {
5858
opts.codec = options.format
5959
}
6060

61+
let dagBuilder
62+
63+
if (typeof options.dagBuilder === 'function') {
64+
dagBuilder = options.dagBuilder
65+
} else {
66+
dagBuilder = require('./dag-builder')
67+
}
68+
69+
let treeBuilder
70+
71+
if (typeof options.treeBuilder === 'function') {
72+
treeBuilder = options.treeBuilder
73+
} else {
74+
treeBuilder = require('./tree-builder')
75+
}
76+
6177
for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) {
6278
yield {
6379
cid: entry.cid,

0 commit comments

Comments
 (0)