diff --git a/index.js b/index.js index b0ea403..d07ead9 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,4 @@ -const mutexify = require('mutexify') +const mutexify = require('mutexify/promise') const b4a = require('b4a') const { BlobReadStream, BlobWriteStream } = require('./lib/streams') @@ -11,7 +11,6 @@ module.exports = class Hyperblobs { this.blockSize = opts.blockSize || DEFAULT_BLOCK_SIZE this._lock = mutexify() - this._core = core } get feed () { @@ -59,12 +58,12 @@ module.exports = class Hyperblobs { } createReadStream (id, opts) { - const core = (opts && opts.core) ? opts.core : this._core + const core = (opts && opts.core) ? opts.core : this.core return new BlobReadStream(core, id, opts) } createWriteStream (opts) { - const core = (opts && opts.core) ? opts.core : this._core + const core = (opts && opts.core) ? opts.core : this.core return new BlobWriteStream(core, this._lock, opts) } } diff --git a/lib/streams.js b/lib/streams.js index b315290..cffcbad 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -10,6 +10,7 @@ class SparsePrefetcher { prefetch (bounds, index) { const startBlock = bounds.blockOffset const endBlock = startBlock + bounds.blockLength + return { start: Math.max(index, startBlock), end: Math.min(index + this._length, endBlock), @@ -18,50 +19,28 @@ class SparsePrefetcher { } } -/* eslint-disable no-unused-vars */ -class EagerPrefetcher { - constructor (opts = {}) { - this._linear = opts.linear !== false - this._done = false - } - - prefetch (bounds) { - if (this._done) return null - this._done = true - const startBlock = bounds.blockOffset - const endBlock = startBlock + bounds.blockLength - return { start: startBlock, end: endBlock, linear: this._linear } - } -} - class BlobWriteStream extends Writable { constructor (core, lock, opts) { super(opts) - this.id = {} + this.core = core + this.id = { byteOffset: 0, blockOffset: 0, blockLength: 0, byteLength: 0 } + this._lock = lock this._release = null this._batch = [] } _open (cb) { - this.core.ready().then(() => { - this._lock(release => { - this._release = release - this.id.byteOffset = this.core.byteLength - this.id.blockOffset = this.core.length - return cb(null) - }) - }, err => cb(err)) + this._openp().then(cb, cb) + } + + _write (data, cb) { + this._writep(data).then(cb, cb) } _final (cb) { - this._append(err => { - if (err) return cb(err) - this.id.blockLength = this.core.length - this.id.blockOffset - this.id.byteLength = this.core.byteLength - this.id.byteOffset - return cb(null) - }) + this._finalp().then(cb, cb) } _destroy (cb) { @@ -69,29 +48,45 @@ class BlobWriteStream extends Writable { cb(null) } - _append (cb) { - if (!this._batch.length) return cb(null) - return this.core.append(this._batch).then(() => { - this._batch = [] - return cb(null) - }, err => { - this._batch = [] - return cb(err) - }) + async _openp () { + await this.core.ready() + + this._release = await this._lock() + + this.id.byteOffset = this.core.byteLength + this.id.blockOffset = this.core.length } - _write (data, cb) { + async _writep (data) { this._batch.push(data) - if (this._batch.length >= 16) return this._append(cb) - return cb(null) + + if (this._batch.length >= 16) await this._append() + } + + async _finalp () { + await this._append() + + this.id.blockLength = this.core.length - this.id.blockOffset + this.id.byteLength = this.core.byteLength - this.id.byteOffset + } + + async _append () { + if (!this._batch.length) return + + try { + await this.core.append(this._batch) + } finally { + this._batch = [] + } } } class BlobReadStream extends Readable { constructor (core, id, opts = {}) { super(opts) - this.id = id + this.core = core.session({ wait: opts.wait, timeout: opts.timeout }) + this.id = id this._prefetch = opts.wait === false ? noPrefetch : opts.prefetch this._lastPrefetch = null @@ -112,22 +107,11 @@ class BlobReadStream extends Readable { } _open (cb) { - if (this._pos === this.id.byteOffset) { - this._index = this.id.blockOffset - this._relativeOffset = 0 - return cb(null) - } - - this.core.seek(this._pos, { - start: this.id.blockOffset, - end: this.id.blockOffset + this.id.blockLength - }).then(result => { - if (!result) return cb(BLOCK_NOT_AVAILABLE()) + this._openp().then(cb, cb) + } - this._index = result[0] - this._relativeOffset = result[1] - return cb(null) - }, err => cb(err)) + _read (cb) { + this._readp().then(cb, cb) } _predestroy () { @@ -138,10 +122,28 @@ class BlobReadStream extends Readable { this.core.close().then(cb, cb) } - _read (cb) { + async _openp (cb) { + if (this._pos === this.id.byteOffset) { + this._index = this.id.blockOffset + this._relativeOffset = 0 + return + } + + const seek = await this.core.seek(this._pos, { + start: this.id.blockOffset, + end: this.id.blockOffset + this.id.blockLength + }) + + if (!seek) throw BLOCK_NOT_AVAILABLE() + + this._index = seek[0] + this._relativeOffset = seek[1] + } + + async _readp (cb) { if (this._pos >= this._end) { this.push(null) - return cb(null) + return } const prefetch = this._prefetch(this.id, this._index) @@ -150,22 +152,20 @@ class BlobReadStream extends Readable { this._lastPrefetch = this.core.download(prefetch) } - this.core.get(this._index).then(block => { - if (!block) return cb(BLOCK_NOT_AVAILABLE()) + let block = await this.core.get(this._index) + if (!block) throw BLOCK_NOT_AVAILABLE() - const remainder = this._end - this._pos - if (this._relativeOffset || (remainder < block.length)) { - block = block.subarray(this._relativeOffset, this._relativeOffset + remainder) - } + const remainder = this._end - this._pos + if (this._relativeOffset || (remainder < block.length)) { + block = block.subarray(this._relativeOffset, this._relativeOffset + remainder) + } - this._index++ - this._relativeOffset = 0 - this._pos += block.length - this._bytesRead += block.length + this._index++ + this._relativeOffset = 0 + this._pos += block.length + this._bytesRead += block.length - this.push(block) - return cb(null) - }, err => cb(err)) + this.push(block) } } diff --git a/package.json b/package.json index 37140bd..e946376 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "b4a": "^1.6.1", "hypercore-errors": "^1.0.0", "mutexify": "^1.4.0", - "streamx": "^2.13.2" + "streamx": "^2.15.0" }, "devDependencies": { "brittle": "^3.1.0",