From be85505fb374893ee36ef7d41b558ff057e0d50f Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 24 May 2019 20:58:46 +0100 Subject: [PATCH] feat: adds js implementation of rabin chunker for windows and browser --- package.json | 1 + src/chunker/rabin.js | 232 ++++++++++++++++++++++++++--- src/index.js | 4 +- test/chunker-fixed-size.spec.js | 6 - test/chunker-rabin-browser.spec.js | 25 ---- test/chunker-rabin.spec.js | 43 +++--- 6 files changed, 232 insertions(+), 79 deletions(-) delete mode 100644 test/chunker-rabin-browser.spec.js diff --git a/package.json b/package.json index 2777f36..7a9c538 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,7 @@ "hamt-sharding": "~0.0.2", "ipfs-unixfs": "~0.1.16", "ipld-dag-pb": "~0.17.2", + "long": "^4.0.0", "multicodec": "~0.5.1", "multihashing-async": "~0.7.0", "superstruct": "~0.6.1" diff --git a/src/chunker/rabin.js b/src/chunker/rabin.js index ca130e6..1e90369 100644 --- a/src/chunker/rabin.js +++ b/src/chunker/rabin.js @@ -1,19 +1,17 @@ 'use strict' const errCode = require('err-code') - -let createRabin +const Long = require('long') +const BufferList = require('bl') +let rabin module.exports = async function * rabinChunker (source, options) { - if (!createRabin) { + if (!rabin) { try { - createRabin = require('rabin') - - if (typeof createRabin !== 'function') { - throw errCode(new Error(`createRabin was not a function`), 'ERR_UNSUPPORTED') - } - } catch (err) { - throw errCode(new Error(`Rabin chunker not available, it may have failed to install or not be supported on this platform`), 'ERR_UNSUPPORTED') + rabin = nativeRabin() + } catch (_) { + // fallback to js implementation + rabin = jsRabin() } } @@ -30,30 +28,216 @@ module.exports = async function * rabinChunker (source, options) { } const sizepow = Math.floor(Math.log2(avg)) - const rabin = createRabin({ + + for await (const chunk of rabin(source, { min: min, max: max, bits: sizepow, window: options.window, polynomial: options.polynomial - }) + })) { + yield chunk + } +} + +const nativeRabin = () => { + const createRabin = require('rabin') + + if (typeof rabin !== 'function') { + throw errCode(new Error(`rabin was not a function`), 'ERR_UNSUPPORTED') + } + + return async function * (source, options) { + const rabin = createRabin(options) + + // TODO: rewrite rabin using node streams v3 + for await (const chunk of source) { + rabin.buffers.append(chunk) + rabin.pending.push(chunk) + + const sizes = [] + + rabin.rabin.fingerprint(rabin.pending, sizes) + rabin.pending = [] + + for (let i = 0; i < sizes.length; i++) { + const size = sizes[i] + const buf = rabin.buffers.slice(0, size) + rabin.buffers.consume(size) + + yield buf + } + } + + if (rabin.buffers.length) { + yield rabin.buffers.slice(0) + } + } +} + +const jsRabin = () => { + // see https://github.com/datproject/rabin/blob/c0378395dc0a125ab21ac176ec504f9995b34e62/src/rabin.cc + class Rabin { + constructor (options) { + this.window = new Array(options.window || 64).fill(Long.fromInt(0)) + this.wpos = 0 + this.count = 0 + this.digest = Long.fromInt(0) + this.chunkLength = 0 + this.polynomial = options.polynomial + this.polynomialDegree = 53 + this.polynomialShift = this.polynomialDegree - 8 + this.averageBits = options.bits || 12 + this.minSize = options.min || 8 * 1024 + this.maxSize = options.max || 32 * 1024 + this.mask = Long.fromInt(1).shiftLeft(this.averageBits).subtract(1) + this.modTable = [] + this.outTable = [] + + this.calculateTables() + } + + calculateTables () { + for (let i = 0; i < 256; i++) { + let hash = Long.fromInt(0, true) + + hash = this.appendByte(hash, i) + + for (let j = 0; j < this.window.length - 1; j++) { + hash = this.appendByte(hash, 0) + } + + this.outTable[i] = hash + } + + const k = this.deg(this.polynomial) + + for (let i = 0; i < 256; i++) { + const b = Long.fromInt(i, true) + + this.modTable[i] = b.shiftLeft(k) + .modulo(this.polynomial) + .or(b.shiftLeft(k)) + } + } + + deg (p) { + let mask = Long.fromString('0x8000000000000000', true, 16) + + for (let i = 0; i < 64; i++) { + if (mask.and(p).greaterThan(0)) { + return Long.fromInt(63 - i) + } + + mask = mask.shiftRight(1) + } - // TODO: rewrite rabin using node streams v3 - for await (const chunk of source) { - rabin.buffers.append(chunk) - rabin.pending.push(chunk) + return Long.fromInt(-1) + } + + appendByte (hash, b) { + hash = hash.shiftLeft(8) + hash = hash.or(b) + + return hash.modulo(this.polynomial) + } + + getFingerprints (bufs) { + const lengths = [] + + for (let i = 0; i < bufs.length; i++) { + let buf = bufs[i] + + while (true) { + const remaining = this.nextChunk(buf) + + if (remaining < 0) { + break + } + + buf = buf.slice(remaining) + + lengths.push(this.chunkLength) + } + } + + return lengths + } + + nextChunk (buf) { + for (let i = 0; i < buf.length; i++) { + const val = Long.fromInt(buf[i]) + + this.slide(val) + + this.count++ + + if ((this.count >= this.minSize && this.digest.and(this.mask).equals(0)) || this.count >= this.maxSize) { + this.chunkLength = this.count + + this.reset() + + return i + 1 + } + } + + return -1 + } + + slide (value) { + const out = this.window[this.wpos].toInt() & 255 + this.window[this.wpos] = value + this.digest = this.digest.xor(this.outTable[out]) + this.wpos = (this.wpos + 1) % this.window.length + + this.append(value) + } + + reset () { + this.window = this.window.map(() => Long.fromInt(0)) + this.wpos = 0 + this.count = 0 + this.digest = Long.fromInt(0) - const sizes = [] + this.slide(Long.fromInt(1)) + } - rabin.rabin.fingerprint(rabin.pending, sizes) - rabin.pending = [] + append (value) { + const index = this.digest.shiftRight(this.polynomialShift).toInt() & 255 + this.digest = this.digest.shiftLeft(8) + this.digest = this.digest.or(value) - for (let i = 0; i < sizes.length; i++) { - const size = sizes[i] - const buf = rabin.buffers.slice(0, size) - rabin.buffers.consume(size) + const entry = this.modTable[index] + + if (entry) { + this.digest = this.digest.xor(entry) + } + } + } + + return async function * (source, options) { + const r = new Rabin(options) + const buffers = new BufferList() + let pending = [] + + for await (const chunk of source) { + buffers.append(chunk) + pending.push(chunk) + + const sizes = r.getFingerprints(pending) + pending = [] + + for (let i = 0; i < sizes.length; i++) { + var size = sizes[i] + var buf = buffers.slice(0, size) + buffers.consume(size) + + yield buf + } + } - yield buf + if (buffers.length) { + yield buffers.slice(0) } } } diff --git a/src/index.js b/src/index.js index ac1cde7..4ff621d 100644 --- a/src/index.js +++ b/src/index.js @@ -18,12 +18,12 @@ const ChunkerOptions = struct({ maxChunkSize: 'number?', avgChunkSize: 'number?', window: 'number?', - polynomial: 'string?' + polynomial: 'number?' }, { maxChunkSize: 262144, avgChunkSize: 262144, window: 16, - polynomial: '0x3DF305DFB2A805' + polynomial: 17437180132763653 // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11 }) const BuilderOptions = struct({ diff --git a/test/chunker-fixed-size.spec.js b/test/chunker-fixed-size.spec.js index d5d02a5..f4b05ba 100644 --- a/test/chunker-fixed-size.spec.js +++ b/test/chunker-fixed-size.spec.js @@ -13,12 +13,6 @@ const rawFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1MiB.txt' describe('chunker: fixed size', function () { this.timeout(30000) - before(function () { - if (!isNode) { - this.skip() - } - }) - it('chunks non flat buffers', async () => { const b1 = Buffer.alloc(2 * 256) const b2 = Buffer.alloc(1 * 256) diff --git a/test/chunker-rabin-browser.spec.js b/test/chunker-rabin-browser.spec.js deleted file mode 100644 index abac661..0000000 --- a/test/chunker-rabin-browser.spec.js +++ /dev/null @@ -1,25 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chunker = require('../src/chunker/rabin') -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect -const isNode = require('detect-node') -const all = require('async-iterator-all') - -describe('chunker: rabin browser', () => { - before(function () { - if (isNode) { - this.skip() - } - }) - - it('returns an error', async () => { - try { - await all(chunker()) - } catch (err) { - expect(err.code).to.equal('ERR_UNSUPPORTED') - } - }) -}) diff --git a/test/chunker-rabin.spec.js b/test/chunker-rabin.spec.js index be61304..4ac92bf 100644 --- a/test/chunker-rabin.spec.js +++ b/test/chunker-rabin.spec.js @@ -6,7 +6,6 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const loadFixture = require('aegir/fixtures') -const os = require('os') const isNode = require('detect-node') const all = require('async-iterator-all') @@ -15,15 +14,11 @@ const rawFile = loadFixture((isNode ? __dirname : 'test') + '/fixtures/1MiB.txt' describe('chunker: rabin', function () { this.timeout(30000) - before(function () { - if (os.platform() === 'win32') { - return this.skip() - } - - if (!isNode) { - this.skip() - } - }) + const defaultOptions = { + avgChunkSize: 262144, + window: 64, + polynomial: 17437180132763653 + } it('chunks non flat buffers', async () => { const b1 = Buffer.alloc(2 * 256) @@ -35,16 +30,22 @@ describe('chunker: rabin', function () { b3.fill('c') const chunks = await all(chunker([b1, b2, b3], { + ...defaultOptions, minChunkSize: 48, avgChunkSize: 96, - maxChunkSize: 192, - window: 16, - polynomial: '0x3DF305DFB2A805' + maxChunkSize: 192 })) - chunks.forEach((chunk) => { - expect(chunk).to.have.length.gte(48) - expect(chunk).to.have.length.lte(192) + const size = chunks.reduce((acc, curr) => acc + curr.length, 0) + + expect(size).to.equal(b1.length + b2.length + b3.length) + + chunks.forEach((chunk, index) => { + if (index === chunks.length - 1) { + expect(chunk.length).to.equal(128) + } else { + expect(chunk.length).to.equal(192) + } }) }) @@ -53,11 +54,10 @@ describe('chunker: rabin', function () { b1.fill('a') const chunks = await all(chunker([b1], { + ...defaultOptions, maxChunkSize: 262144, minChunkSize: 1, - avgChunkSize: 256, - window: 16, - polynomial: '0x3DF305DFB2A805' + avgChunkSize: 256 })) chunks.forEach((chunk) => { @@ -70,11 +70,10 @@ describe('chunker: rabin', function () { const KiB256 = 262144 let file = Buffer.concat([rawFile, Buffer.from('hello')]) const opts = { + ...defaultOptions, minChunkSize: KiB256 / 3, avgChunkSize: KiB256, - maxChunkSize: KiB256 + (KiB256 / 2), - window: 16, - polynomial: '0x3DF305DFB2A805' + maxChunkSize: KiB256 + (KiB256 / 2) } const chunks = await all(chunker([file], opts))