Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 2dea4c9

Browse files
committed
chore: wip
1 parent 494235d commit 2dea4c9

File tree

5 files changed

+73
-97
lines changed

5 files changed

+73
-97
lines changed

src/core/components/files-regular/ls.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const callbackify = require('callbackify')
44
const all = require('async-iterator-all')
55

66
module.exports = function (self) {
7-
return callbackify(async function ls (ipfsPath, options) { // eslint-disable-line require-await
7+
return callbackify.variadic(async function ls (ipfsPath, options) { // eslint-disable-line require-await
88
return all(self._lsAsyncIterator(ipfsPath, options))
99
})
1010
}

src/core/components/files-regular/refs-async-iterator.js

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ function getFullPath (ipfs, ipfsPath, options) {
4141
const path = normalizePath(ipfsPath)
4242
const pathComponents = path.split('/')
4343
const cid = pathComponents[0]
44+
4445
if (!isIpfs.cid(cid)) {
4546
throw new Error(`Error resolving path '${path}': '${cid}' is not a valid CID`)
4647
}
@@ -89,43 +90,33 @@ function formatLink (srcCid, dstCid, linkName, format) {
8990
}
9091

9192
// Do a depth first search of the DAG, starting from the given root cid
92-
async function * objectStream (ipfs, rootCid, maxDepth, isUnique) {
93-
const uniques = new Set()
93+
async function * objectStream (ipfs, rootCid, maxDepth, uniqueOnly) {
94+
const seen = new Set()
9495

95-
async function * traverseLevel (obj) {
96-
const { node, depth } = obj
96+
async function * traverseLevel (parent, depth) {
97+
const nextLevelDepth = depth + 1
9798

9899
// Check the depth
99-
const nextLevelDepth = depth + 1
100100
if (nextLevelDepth > maxDepth) {
101101
return
102102
}
103103

104-
// If unique option is enabled, check if the CID has been seen before.
105-
// Note we need to do this here rather than before adding to the stream
106-
// so that the unique check happens in the order that items are examined
107-
// in the DAG.
108-
if (isUnique) {
109-
if (uniques.has(node.cid.toString())) {
110-
// Mark this object as a duplicate so we can filter it out later
111-
obj.isDuplicate = true
112-
return
113-
}
114-
uniques.add(node.cid.toString())
115-
}
116-
117104
// Get this object's links
118105
try {
119-
// Add to the stream each link, parent and the new depth
120-
for (const link of await getLinks(ipfs, node.cid)) {
121-
const child = {
122-
parent: node,
106+
// Look at each link, parent and the new depth
107+
for (const link of await getLinks(ipfs, parent.cid)) {
108+
109+
yield {
110+
parent: parent,
123111
node: link,
124-
depth: nextLevelDepth
112+
isDuplicate: uniqueOnly && seen.has(link.cid.toString())
113+
}
114+
115+
if (uniqueOnly) {
116+
seen.add(link.cid.toString())
125117
}
126118

127-
yield child
128-
yield * await traverseLevel(child)
119+
yield * await traverseLevel(link, nextLevelDepth)
129120
}
130121
} catch (err) {
131122
if (err.code === ERR_NOT_FOUND) {
@@ -136,7 +127,7 @@ async function * objectStream (ipfs, rootCid, maxDepth, isUnique) {
136127
}
137128
}
138129

139-
yield * await traverseLevel({ node: { cid: rootCid }, depth: 0 })
130+
yield * await traverseLevel({ cid: rootCid }, 0)
140131
}
141132

142133
// Fetch a node from IPLD then get all its links

src/core/components/files-regular/refs-local.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const callbackify = require('callbackify')
44
const all = require('async-iterator-all')
55

66
module.exports = function (self) {
7-
return callbackify(async function refsLocal (ipfsPath, options) { // eslint-disable-line require-await
7+
return callbackify.variadic(async function refsLocal (ipfsPath, options) { // eslint-disable-line require-await
88
return all(self.refs._localAsyncIterator(ipfsPath, options))
99
})
1010
}

src/core/components/files-regular/refs.js

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,11 @@
11
'use strict'
22

3-
const promisify = require('promisify-es6')
4-
const pull = require('pull-stream')
3+
const callbackify = require('callbackify')
4+
const all = require('async-iterator-all')
55

66
module.exports = function (self) {
7-
return promisify((ipfsPath, options, callback) => {
8-
if (typeof options === 'function') {
9-
callback = options
10-
options = {}
11-
}
12-
13-
options = options || {}
14-
15-
pull(
16-
self.refsPullStream(ipfsPath, options),
17-
pull.collect((err, values) => {
18-
if (err) {
19-
return callback(err)
20-
}
21-
callback(null, values)
22-
})
23-
)
7+
return callbackify.variadic(async function refs (ipfsPath, options) {
8+
return all(self._refsAsyncIterator(ipfsPath, options))
249
})
2510
}
2611

src/http/api/resources/files-regular.js

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -330,69 +330,69 @@ exports.refs = {
330330
const { ipfs } = request.server.app
331331
const { key } = request.pre.args
332332

333-
const recursive = request.query.recursive
334-
const format = request.query.format
335-
const edges = request.query.edges
336-
const unique = request.query.unique
337-
const maxDepth = request.query['max-depth']
338-
339-
const source = ipfs.refsPullStream(key, { recursive, format, edges, unique, maxDepth })
340-
return sendRefsReplyStream(request, h, `refs for ${key}`, source)
333+
const options = {
334+
recursive: request.query.recursive,
335+
format: request.query.format,
336+
edges: request.query.edges,
337+
unique: request.query.unique,
338+
maxDepth: request.query['max-depth']
339+
}
340+
341+
if (options.edges && options.format && options.format !== Format.default) {
342+
throw new Error('Cannot set edges to true and also specify format')
343+
}
344+
345+
return streamResponse(request, h, async (output) => {
346+
for await (const ref of ipfs._refsAsyncIterator(key, options)) {
347+
output.write(
348+
JSON.stringify({
349+
Ref: ref.ref,
350+
Err: ref.err
351+
}) + '\n'
352+
)
353+
}
354+
})
341355
}
342356
}
343357

344358
exports.refs.local = {
345359
// main route handler
346360
handler (request, h) {
347361
const { ipfs } = request.server.app
348-
const source = ipfs.refs.localPullStream()
349-
return sendRefsReplyStream(request, h, 'local refs', source)
362+
363+
return streamResponse(request, h, async (output) => {
364+
for await (const ref of ipfs.refs._localAsyncIterator()) {
365+
output.write(
366+
JSON.stringify({
367+
Ref: ref.ref,
368+
Err: ref.err
369+
}) + '\n'
370+
)
371+
}
372+
})
350373
}
351374
}
352375

353-
function sendRefsReplyStream (request, h, desc, source) {
354-
const replyStream = pushable()
355-
const aborter = abortable()
356-
357-
const stream = toStream.source(pull(
358-
replyStream,
359-
aborter,
360-
ndjson.serialize()
361-
))
362-
363-
// const stream = toStream.source(replyStream.source)
364-
// hapi is not very clever and throws if no
365-
// - _read method
366-
// - _readableState object
367-
// are there :(
368-
if (!stream._read) {
369-
stream._read = () => {}
370-
stream._readableState = {}
371-
stream.unpipe = () => {}
372-
}
376+
function streamResponse (request, h, fn) {
377+
const output = new PassThrough()
378+
const errorTrailer = 'X-Stream-Error'
373379

374-
pull(
375-
source,
376-
pull.drain(
377-
(ref) => replyStream.push({ Ref: ref.ref, Err: ref.err }),
378-
(err) => {
379-
if (err) {
380-
request.raw.res.addTrailers({
381-
'X-Stream-Error': JSON.stringify({
382-
Message: `Failed to get ${desc}: ${err.message || ''}`,
383-
Code: 0
384-
})
380+
Promise.resolve()
381+
.then(() => fn(output))
382+
.catch(err => {
383+
request.raw.res.addTrailers({
384+
[errorTrailer]: JSON.stringify({
385+
Message: err.message,
386+
Code: 0
385387
})
386-
return aborter.abort()
387-
}
388-
389-
replyStream.end()
390-
}
391-
)
392-
)
388+
})
389+
})
390+
.finally(() => {
391+
output.end()
392+
})
393393

394-
return h.response(stream)
394+
return h.response(output)
395395
.header('x-chunked-output', '1')
396396
.header('content-type', 'application/json')
397-
.header('Trailer', 'X-Stream-Error')
397+
.header('Trailer', errorTrailer)
398398
}

0 commit comments

Comments
 (0)