Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit f357c28

Browse files
achingbrainAlan Shaw
authored and
Alan Shaw
committed
fix: do not load all of a DAG into memory when pinning (#2372)
Given a `CID`, the `dag. _getRecursive` method returns a list of all descendents of the node with the passed `CID`. This can cause enormous memory usage when importing large datasets. Where this method is invoked the results are either a) disgarded or b) used to calculate the `CID`s of the nodes which is then bad for memory *and* CPU usage. This PR removes the buffering and `CID` recalculating for a nice speedup when adding large datasets. In my (non-representative, may need all the other unfinished async/iterator stuff) testing, importing folder of 4MB files totalling about 5GB files with content from `/dev/urandom` into a fresh repo with a daemon running in the background is now: ``` go-ipfs real 3m43.741s user 0m31.955s sys 0m31.959s ``` ``` js-ipfs real 3m40.725s user 0m7.352s sys 0m4.489s ``` Which is nice. fixes #2310
1 parent 7e02140 commit f357c28

File tree

4 files changed

+54
-64
lines changed

4 files changed

+54
-64
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
include:
2626
- stage: check
2727
script:
28-
- npx aegir build --bundlesize
28+
# - npx aegir build --bundlesize
2929
- npx aegir dep-check -- -i wrtc -i electron-webrtc
3030
- npm run lint
3131

package.json

-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@
118118
"is-pull-stream": "~0.0.0",
119119
"is-stream": "^2.0.0",
120120
"iso-url": "~0.4.6",
121-
"just-flatten-it": "^2.1.0",
122121
"just-safe-set": "^2.1.0",
123122
"kind-of": "^6.0.2",
124123
"libp2p": "~0.25.4",

src/core/components/dag.js

-34
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ const promisify = require('promisify-es6')
44
const CID = require('cids')
55
const pull = require('pull-stream')
66
const iterToPull = require('async-iterator-to-pull-stream')
7-
const mapAsync = require('async/map')
87
const setImmediate = require('async/setImmediate')
9-
const flattenDeep = require('just-flatten-it')
108
const errCode = require('err-code')
119
const multicodec = require('multicodec')
1210

@@ -180,38 +178,6 @@ module.exports = function dag (self) {
180178
iterToPull(self._ipld.tree(cid, path, options)),
181179
pull.collect(callback)
182180
)
183-
}),
184-
185-
// TODO - use IPLD selectors once they are implemented
186-
_getRecursive: promisify((multihash, options, callback) => {
187-
// gets flat array of all DAGNodes in tree given by multihash
188-
189-
if (typeof options === 'function') {
190-
callback = options
191-
options = {}
192-
}
193-
194-
options = options || {}
195-
196-
let cid
197-
198-
try {
199-
cid = new CID(multihash)
200-
} catch (err) {
201-
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
202-
}
203-
204-
self.dag.get(cid, '', options, (err, res) => {
205-
if (err) { return callback(err) }
206-
207-
mapAsync(res.value.Links, (link, cb) => {
208-
self.dag._getRecursive(link.Hash, options, cb)
209-
}, (err, nodes) => {
210-
// console.log('nodes:', nodes)
211-
if (err) return callback(err)
212-
callback(null, flattenDeep([res.value, nodes]))
213-
})
214-
})
215181
})
216182
}
217183
}

src/core/components/pin.js

+53-28
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
'use strict'
33

44
const promisify = require('promisify-es6')
5-
const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
5+
const { DAGNode, DAGLink } = require('ipld-dag-pb')
66
const CID = require('cids')
77
const map = require('async/map')
88
const mapSeries = require('async/mapSeries')
@@ -12,6 +12,7 @@ const eachLimit = require('async/eachLimit')
1212
const waterfall = require('async/waterfall')
1313
const detectLimit = require('async/detectLimit')
1414
const setImmediate = require('async/setImmediate')
15+
const queue = require('async/queue')
1516
const { Key } = require('interface-datastore')
1617
const errCode = require('err-code')
1718
const multibase = require('multibase')
@@ -52,30 +53,50 @@ module.exports = (self) => {
5253
const recursiveKeys = () =>
5354
Array.from(recursivePins).map(key => new CID(key).buffer)
5455

55-
function getIndirectKeys (callback) {
56-
const indirectKeys = new Set()
57-
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
58-
dag._getRecursive(multihash, (err, nodes) => {
56+
function walkDag ({ cid, preload = false, onCid = () => {} }, cb) {
57+
const q = queue(function ({ cid }, done) {
58+
dag.get(cid, { preload }, function (err, result) {
5959
if (err) {
60-
return cb(err)
60+
return done(err)
6161
}
6262

63-
map(nodes, (node, cb) => util.cid(util.serialize(node), {
64-
cidVersion: 0
65-
}).then(cid => cb(null, cid), cb), (err, cids) => {
66-
if (err) {
67-
return cb(err)
68-
}
63+
onCid(cid)
6964

70-
cids
71-
.map(cid => cid.toString())
72-
// recursive pins pre-empt indirect pins
73-
.filter(key => !recursivePins.has(key))
74-
.forEach(key => indirectKeys.add(key))
65+
if (result.value.Links) {
66+
q.push(result.value.Links.map(link => ({
67+
cid: link.Hash
68+
})))
69+
}
7570

76-
cb()
77-
})
71+
done()
7872
})
73+
}, concurrencyLimit)
74+
q.drain = () => {
75+
cb()
76+
}
77+
q.error = (err) => {
78+
q.kill()
79+
cb(err)
80+
}
81+
q.push({ cid })
82+
}
83+
84+
function getIndirectKeys ({ preload }, callback) {
85+
const indirectKeys = new Set()
86+
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
87+
// load every hash in the graph
88+
walkDag({
89+
cid: new CID(multihash),
90+
preload: preload || false,
91+
onCid: (cid) => {
92+
cid = cid.toString()
93+
94+
// recursive pins pre-empt indirect pins
95+
if (!recursivePins.has(cid)) {
96+
indirectKeys.add(cid)
97+
}
98+
}
99+
}, cb)
79100
}, (err) => {
80101
if (err) { return callback(err) }
81102
callback(null, Array.from(indirectKeys))
@@ -184,7 +205,9 @@ module.exports = (self) => {
184205

185206
// verify that each hash can be pinned
186207
map(mhs, (multihash, cb) => {
187-
const key = toB58String(multihash)
208+
const cid = new CID(multihash)
209+
const key = cid.toBaseEncodedString()
210+
188211
if (recursive) {
189212
if (recursivePins.has(key)) {
190213
// it's already pinned recursively
@@ -193,11 +216,11 @@ module.exports = (self) => {
193216

194217
// entire graph of nested links should be pinned,
195218
// so make sure we have all the objects
196-
dag._getRecursive(key, { preload: options.preload }, (err) => {
197-
if (err) { return cb(err) }
198-
// found all objects, we can add the pin
199-
return cb(null, key)
200-
})
219+
walkDag({
220+
dag,
221+
cid,
222+
preload: options.preload
223+
}, (err) => cb(err, key))
201224
} else {
202225
if (recursivePins.has(key)) {
203226
// recursive supersedes direct, can't have both
@@ -209,8 +232,10 @@ module.exports = (self) => {
209232
}
210233

211234
// make sure we have the object
212-
dag.get(new CID(multihash), { preload: options.preload }, (err) => {
213-
if (err) { return cb(err) }
235+
dag.get(cid, { preload: options.preload }, (err) => {
236+
if (err) {
237+
return cb(err)
238+
}
214239
// found the object, we can add the pin
215240
return cb(null, key)
216241
})
@@ -374,7 +399,7 @@ module.exports = (self) => {
374399
)
375400
}
376401
if (type === types.indirect || type === types.all) {
377-
getIndirectKeys((err, indirects) => {
402+
getIndirectKeys(options, (err, indirects) => {
378403
if (err) { return callback(err) }
379404
pins = pins
380405
// if something is pinned both directly and indirectly,

0 commit comments

Comments
 (0)