Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit f869455

Browse files
achingbrainAlan Shaw
authored and
Alan Shaw
committed
fix: Do not load all of a DAG into memory when pinning (#2387)
Port of #2372 into gc branch to ease merging
1 parent d085a30 commit f869455

File tree

2 files changed

+50
-24
lines changed

2 files changed

+50
-24
lines changed

src/core/components/pin.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ module.exports = (self) => {
247247
}
248248

249249
if (type === PinTypes.indirect || type === PinTypes.all) {
250-
pinManager.getIndirectKeys((err, indirects) => {
250+
pinManager.getIndirectKeys(options, (err, indirects) => {
251251
if (err) { return callback(err) }
252252
pins = pins
253253
// if something is pinned both directly and indirectly,

src/core/components/pin/pin-manager.js

+49-23
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
/* eslint max-nested-callbacks: ["error", 8] */
22
'use strict'
33

4-
const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
4+
const { DAGNode, DAGLink } = require('ipld-dag-pb')
55
const CID = require('cids')
6-
const map = require('async/map')
76
const series = require('async/series')
87
const parallel = require('async/parallel')
98
const eachLimit = require('async/eachLimit')
109
const waterfall = require('async/waterfall')
1110
const detectLimit = require('async/detectLimit')
11+
const queue = require('async/queue')
1212
const { Key } = require('interface-datastore')
1313
const errCode = require('err-code')
1414
const multicodec = require('multicodec')
@@ -43,6 +43,34 @@ class PinManager {
4343
this.recursivePins = new Set()
4444
}
4545

46+
_walkDag ({ cid, preload = false, onCid = () => {} }, cb) {
47+
const q = queue(({ cid }, done) => {
48+
this.dag.get(cid, { preload }, (err, result) => {
49+
if (err) {
50+
return done(err)
51+
}
52+
53+
onCid(cid)
54+
55+
if (result.value.Links) {
56+
q.push(result.value.Links.map(link => ({
57+
cid: link.Hash
58+
})))
59+
}
60+
61+
done()
62+
})
63+
}, concurrencyLimit)
64+
q.drain = () => {
65+
cb()
66+
}
67+
q.error = (err) => {
68+
q.kill()
69+
cb(err)
70+
}
71+
q.push({ cid })
72+
}
73+
4674
directKeys () {
4775
return Array.from(this.directPins, key => new CID(key).buffer)
4876
}
@@ -51,30 +79,21 @@ class PinManager {
5179
return Array.from(this.recursivePins, key => new CID(key).buffer)
5280
}
5381

54-
getIndirectKeys (callback) {
82+
getIndirectKeys ({ preload }, callback) {
5583
const indirectKeys = new Set()
5684
eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => {
57-
this.dag._getRecursive(multihash, (err, nodes) => {
58-
if (err) {
59-
return cb(err)
60-
}
61-
62-
map(nodes, (node, cb) => util.cid(util.serialize(node), {
63-
cidVersion: 0
64-
}).then(cid => cb(null, cid), cb), (err, cids) => {
65-
if (err) {
66-
return cb(err)
85+
this._walkDag({
86+
cid: new CID(multihash),
87+
preload: preload || false,
88+
onCid: (cid) => {
89+
cid = cid.toString()
90+
91+
// recursive pins pre-empt indirect pins
92+
if (!this.recursivePins.has(cid)) {
93+
indirectKeys.add(cid)
6794
}
68-
69-
cids
70-
.map(cid => cid.toString())
71-
// recursive pins pre-empt indirect pins
72-
.filter(key => !this.recursivePins.has(key))
73-
.forEach(key => indirectKeys.add(key))
74-
75-
cb()
76-
})
77-
})
95+
}
96+
}, cb)
7897
}, (err) => {
7998
if (err) { return callback(err) }
8099
callback(null, Array.from(indirectKeys))
@@ -283,6 +302,13 @@ class PinManager {
283302
})
284303
}
285304

305+
fetchCompleteDag (cid, options, callback) {
306+
this._walkDag({
307+
cid,
308+
preload: options.preload
309+
}, callback)
310+
}
311+
286312
// Returns an error if the pin type is invalid
287313
static checkPinType (type) {
288314
if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) {

0 commit comments

Comments
 (0)