1
1
/* eslint max-nested-callbacks: ["error", 8] */
2
2
'use strict'
3
3
4
- const { DAGNode, DAGLink, util } = require ( 'ipld-dag-pb' )
4
+ const { DAGNode, DAGLink } = require ( 'ipld-dag-pb' )
5
5
const CID = require ( 'cids' )
6
- const map = require ( 'async/map' )
7
6
const series = require ( 'async/series' )
8
7
const parallel = require ( 'async/parallel' )
9
8
const eachLimit = require ( 'async/eachLimit' )
10
9
const waterfall = require ( 'async/waterfall' )
11
10
const detectLimit = require ( 'async/detectLimit' )
11
+ const queue = require ( 'async/queue' )
12
12
const { Key } = require ( 'interface-datastore' )
13
13
const errCode = require ( 'err-code' )
14
14
const multicodec = require ( 'multicodec' )
@@ -43,6 +43,34 @@ class PinManager {
43
43
this . recursivePins = new Set ( )
44
44
}
45
45
46
+ _walkDag ( { cid, preload = false , onCid = ( ) => { } } , cb ) {
47
+ const q = queue ( function ( { cid } , done ) {
48
+ this . dag . get ( cid , { preload } , function ( err , result ) {
49
+ if ( err ) {
50
+ return done ( err )
51
+ }
52
+
53
+ onCid ( cid )
54
+
55
+ if ( result . value . Links ) {
56
+ q . push ( result . value . Links . map ( link => ( {
57
+ cid : link . Hash
58
+ } ) ) )
59
+ }
60
+
61
+ done ( )
62
+ } )
63
+ } , concurrencyLimit )
64
+ q . drain = ( ) => {
65
+ cb ( )
66
+ }
67
+ q . error = ( err ) => {
68
+ q . kill ( )
69
+ cb ( err )
70
+ }
71
+ q . push ( { cid } )
72
+ }
73
+
46
74
directKeys ( ) {
47
75
return Array . from ( this . directPins , key => new CID ( key ) . buffer )
48
76
}
@@ -51,30 +79,21 @@ class PinManager {
51
79
return Array . from ( this . recursivePins , key => new CID ( key ) . buffer )
52
80
}
53
81
54
- getIndirectKeys ( callback ) {
82
+ getIndirectKeys ( { preload } , callback ) {
55
83
const indirectKeys = new Set ( )
56
84
eachLimit ( this . recursiveKeys ( ) , concurrencyLimit , ( multihash , cb ) => {
57
- this . dag . _getRecursive ( multihash , ( err , nodes ) => {
58
- if ( err ) {
59
- return cb ( err )
60
- }
61
-
62
- map ( nodes , ( node , cb ) => util . cid ( util . serialize ( node ) , {
63
- cidVersion : 0
64
- } ) . then ( cid => cb ( null , cid ) , cb ) , ( err , cids ) => {
65
- if ( err ) {
66
- return cb ( err )
85
+ this . _walkDag ( {
86
+ cid : new CID ( multihash ) ,
87
+ preload : preload || false ,
88
+ onCid : ( cid ) => {
89
+ cid = cid . toString ( )
90
+
91
+ // recursive pins pre-empt indirect pins
92
+ if ( ! this . recursivePins . has ( cid ) ) {
93
+ indirectKeys . add ( cid )
67
94
}
68
-
69
- cids
70
- . map ( cid => cid . toString ( ) )
71
- // recursive pins pre-empt indirect pins
72
- . filter ( key => ! this . recursivePins . has ( key ) )
73
- . forEach ( key => indirectKeys . add ( key ) )
74
-
75
- cb ( )
76
- } )
77
- } )
95
+ }
96
+ } , cb )
78
97
} , ( err ) => {
79
98
if ( err ) { return callback ( err ) }
80
99
callback ( null , Array . from ( indirectKeys ) )
@@ -283,6 +302,13 @@ class PinManager {
283
302
} )
284
303
}
285
304
305
+ fetchCompleteDag ( cid , options , callback ) {
306
+ this . _walkDag ( {
307
+ cid,
308
+ preload : options . preload
309
+ } , callback )
310
+ }
311
+
286
312
// Returns an error if the pin type is invalid
287
313
static checkPinType ( type ) {
288
314
if ( typeof type !== 'string' || ! Object . keys ( PinTypes ) . includes ( type ) ) {
0 commit comments