This repository was archived by the owner on Aug 12, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
feat: Add reader to read files or part of files as streams #202
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
'use strict' | ||
|
||
const CID = require('cids') | ||
const pull = require('pull-stream') | ||
const asyncValues = require('pull-async-values') | ||
const asyncMap = require('pull-stream/throughs/async-map') | ||
const map = require('pull-stream/throughs/map') | ||
const UnixFS = require('ipfs-unixfs') | ||
const toB58String = require('multihashes').toB58String | ||
const waterfall = require('async/waterfall') | ||
|
||
module.exports = (path, ipldResolver, begin = 0, end) => { | ||
let streamPosition = 0 | ||
|
||
return pull( | ||
asyncValues((cb) => { | ||
waterfall([ | ||
(next) => toCid(path, next), | ||
(cid, next) => ipldResolver.get(cid, next), | ||
(node, next) => { | ||
const meta = UnixFS.unmarshal(node.value.data) | ||
|
||
if (meta.type !== 'file') { | ||
return next(new Error(`Path ${path} was not a file (was ${meta.type}), can only read files`)) | ||
} | ||
|
||
const fileSize = meta.fileSize() | ||
|
||
if (!end || end > fileSize) { | ||
end = fileSize | ||
} | ||
|
||
if (begin < 0) { | ||
begin = fileSize + begin | ||
} | ||
|
||
if (end < 0) { | ||
end = fileSize + end | ||
} | ||
|
||
const links = node.value.links | ||
|
||
if (!links || !links.length) { | ||
if (meta.data && meta.data.length) { | ||
// file was small enough to fit in one DAGNode so has no links | ||
return next(null, [(done) => done(null, meta.data)]) | ||
} | ||
|
||
return next(new Error(`Path ${path} had no links or data`)) | ||
} | ||
|
||
const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0) | ||
const overhead = (linkedDataSize - meta.fileSize()) / links.length | ||
|
||
// create an array of functions to fetch link data | ||
next(null, links.map((link) => (done) => { | ||
// DAGNode Links report unixfs object data sizes $overhead bytes (typically 14) | ||
// larger than they actually are due to the protobuf wrapper | ||
const bytesInLinkedObjectData = link.size - overhead | ||
|
||
if (begin > (streamPosition + bytesInLinkedObjectData)) { | ||
// Start byte is after this block so skip it | ||
streamPosition += bytesInLinkedObjectData | ||
|
||
return done() | ||
} | ||
|
||
if (end < streamPosition) { | ||
// End byte was before this block so skip it | ||
streamPosition += bytesInLinkedObjectData | ||
|
||
return done() | ||
} | ||
|
||
// transform the multihash to a cid, the cid to a node and the node to some data | ||
waterfall([ | ||
(next) => toCid(link.multihash, next), | ||
(cid, next) => ipldResolver.get(cid, next), | ||
(node, next) => next(null, node.value.data), | ||
(data, next) => next(null, UnixFS.unmarshal(data).data) | ||
], done) | ||
})) | ||
} | ||
], cb) | ||
}), | ||
asyncMap((loadLinkData, cb) => loadLinkData(cb)), | ||
pull.filter(Boolean), | ||
map((data) => { | ||
const block = extractDataFromBlock(data, streamPosition, begin, end) | ||
|
||
streamPosition += data.length | ||
|
||
return block | ||
}) | ||
) | ||
} | ||
|
||
function toCid (input, callback) { | ||
let path = input | ||
let cid | ||
|
||
try { | ||
if (Buffer.isBuffer(path)) { | ||
path = toB58String(path) | ||
} | ||
|
||
if (path.indexOf('/ipfs/') === 0) { | ||
path = path.substring('/ipfs/'.length) | ||
} | ||
|
||
if (path.charAt(path.length - 1) === '/') { | ||
path = path.substring(0, path.length - 1) | ||
} | ||
|
||
cid = new CID(path) | ||
} catch (error) { | ||
return callback(new Error(`Path '${input}' was invalid: ${error.message}`)) | ||
} | ||
|
||
callback(null, cid) | ||
} | ||
|
||
function extractDataFromBlock (block, streamPosition, begin, end) { | ||
const blockLength = block.length | ||
|
||
if (end - streamPosition < blockLength) { | ||
// If the end byte is in the current block, truncate the block to the end byte | ||
block = block.slice(0, end - streamPosition) | ||
} | ||
|
||
if (begin > streamPosition && begin < (streamPosition + blockLength)) { | ||
// If the start byte is in the current block, skip to the start byte | ||
block = block.slice(begin - streamPosition) | ||
} | ||
|
||
return block | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, didn't know about pull-async-values.
Couldn't this be done instead with something like a
pull.value([path])
and then using a chain ofpull.map
andpull.asyncMap
?I'm asking because this way we wouldn't need to add an extra dependency..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find an easy way to asynchronously supply an array of values to
pull-stream
when you don't know the length of the values ahead of time using only the built-in functions.The faq recommends using external modules for 1:many transforms and async steps so I'm not sure it's possible without adding an extra dep.
That said, the source for
pull-async-values
is pretty small - I can pull it in to this repo if required.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achingbrain Thanks for explaining, it's fine as it is by me :)