From 24fb89f78ca2e7dac761ab5e8787b6714962bc8f Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Mon, 3 Sep 2018 17:24:19 +0100 Subject: [PATCH 01/10] feat: support chunked add requests --- package.json | 1 + src/http/api/resources/files.js | 10 ++- src/http/api/routes/files.js | 143 +++++++++++++++++++++++++++++++- src/http/index.js | 14 ++-- 4 files changed, 156 insertions(+), 12 deletions(-) diff --git a/package.json b/package.json index 5fb97aa240..25e1bf338c 100644 --- a/package.json +++ b/package.json @@ -170,6 +170,7 @@ "stream-to-pull-stream": "^1.7.2", "tar-stream": "^1.6.1", "temp": "~0.8.3", + "tempy": "^0.2.1", "through2": "^2.0.3", "update-notifier": "^2.5.0", "yargs": "^12.0.1", diff --git a/src/http/api/resources/files.js b/src/http/api/resources/files.js index 3e2fb28d38..13ea6cbd54 100644 --- a/src/http/api/resources/files.js +++ b/src/http/api/resources/files.js @@ -165,6 +165,7 @@ exports.add = { }, handler: (request, reply) => { + console.log('yoo') if (!request.payload) { return reply({ Message: 'Array, Buffer, or String is required.', @@ -182,6 +183,11 @@ exports.add = { parser.on('file', (fileName, fileStream) => { fileName = decodeURIComponent(fileName) + + console.log('file', fileName) + fileStream.on('data', (d) => { + console.log(d.byteLength) + }) const filePair = { path: fileName, content: toPull(fileStream) @@ -192,7 +198,7 @@ exports.add = { parser.on('directory', (directory) => { directory = decodeURIComponent(directory) - + console.log('directory', directory) fileAdder.push({ path: directory, content: '' @@ -220,7 +226,7 @@ exports.add = { rawLeaves: request.query['raw-leaves'], progress: request.query.progress ? progressHandler : null, onlyHash: request.query['only-hash'], - hashAlg: request.query['hash'], + hashAlg: request.query.hash, wrapWithDirectory: request.query['wrap-with-directory'], pin: request.query.pin, chunker: request.query.chunker diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index fc8222f180..02d065d995 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -2,7 +2,66 @@ const resources = require('./../resources') const mfs = require('ipfs-mfs/http') +const fs = require('fs') +const path = require('path') +const tempy = require('tempy') +const multipart = require('ipfs-multipart') +const pull = require('pull-stream') +const toPull = require('stream-to-pull-stream') +const pushable = require('pull-pushable') +const streams = [] +const filesDir = tempy.directory() + +const createMultipartStream = (readStream, boundary, ipfs, cb) => { + const parser = new multipart.Parser({ boundary: boundary }) + readStream.pipe(parser) + const fileAdder = pushable() + + parser.on('file', (fileName, fileStream) => { + fileName = decodeURIComponent(fileName) + + const filePair = { + path: fileName, + content: toPull(fileStream) + } + console.log(filePair) + fileAdder.push(filePair) + }) + + parser.on('directory', (directory) => { + directory = decodeURIComponent(directory) + fileAdder.push({ + path: directory, + content: '' + }) + }) + + parser.on('end', () => { + fileAdder.end() + }) + + pull( + fileAdder, + ipfs.files.addPullStream(), + pull.map((file) => { + return { + Name: file.path, // addPullStream already turned this into a hash if it wanted to + Hash: file.hash, + Size: file.size + } + }), + pull.collect((err, files) => { + if (err) { + cb(err) + return + } + cb(null, files) + }) + ) + + return parser +} module.exports = (server) => { const api = server.select('API') @@ -37,13 +96,95 @@ module.exports = (server) => { config: { payload: { parse: false, - output: 'stream' + output: 'stream', + maxBytes: 10048576 }, handler: resources.files.add.handler, validate: resources.files.add.validate } }) + api.route({ + // TODO fix method + method: 'POST', + path: '/api/v0/add-chunked', + config: { + payload: { + parse: false, + maxBytes: 10048576 + }, + handler: (request, reply) => { + console.log('received') + console.log(request.headers['content-range']) + console.log(request.headers['ipfs-chunk-id']) + console.log(request.headers['ipfs-chunk-name']) + const boundary = request.headers['ipfs-chunk-boundary'] + const id = request.headers['ipfs-chunk-name'] // change name to id + const index = Number(request.headers['ipfs-chunk-id']) + const file = path.join(filesDir, id) + const match = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) + const ipfs = request.server.app.ipfs + // if (!match || !match[1] || !match[2] || !match[3]) { + /* malformed content-range header */ + // res.send('Bad Request', 400) + // return; + // } + + const start = parseInt(match[1]) + const end = parseInt(match[2]) + const total = parseInt(match[3]) + // console.log(start, end, total, index, boundary) + + let stream = streams[id] + if (!stream) { + console.log('create new stream', file) + stream = fs.createWriteStream(file, {flags: 'a+'}) + streams[id] = stream + } + + console.log('stream', file) + let size = 0 + if (fs.existsSync(file)) { + size = fs.statSync(file).size + } + + if ((end + 1) === size) { + /* duplicate chunk */ + // res.send('Created', 201) + // return; + } + + if (start !== size) { + /* missing chunk */ + // res.send('Bad Request', 400) + // return; + } + + if (index === -1) { + // check if size + payload.length === total + /* all chunks have been received */ + stream.on('finish', function () { + console.log('add to ipfs from the file') + var readStream = fs.createReadStream(file) + createMultipartStream(readStream, boundary, ipfs, (err, files) => { + if (err) { + console.error(err) + } + console.log('finished adding to ipfs', files) + reply({files}) + }) + }) + + stream.end() + } else { + stream.write(request.payload) + /* this chunk has been processed successfully */ + reply({ Bytes: request.payload.length }) + } + } + } + }) + api.route({ // TODO fix method method: '*', diff --git a/src/http/index.js b/src/http/index.js index ed1edce219..e0a286d22b 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -103,7 +103,11 @@ function HttpApi (repo, config, cliArgs) { this.server = new Hapi.Server({ connections: { routes: { - cors: true + cors: { + origin: ['*'], + additionalHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'Content-Type', 'Content-Range', 'IPFS-CHUNK-NAME', 'IPFS-CHUNK-ID', 'ipfs-chunk-boundary'], + additionalExposedHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length'] + } } }, debug: process.env.DEBUG ? { @@ -137,14 +141,6 @@ function HttpApi (repo, config, cliArgs) { // load gateway routes require('./gateway/routes')(this.server) - // Set default headers - setHeader(this.server, - 'Access-Control-Allow-Headers', - 'X-Stream-Output, X-Chunked-Output, X-Content-Length') - setHeader(this.server, - 'Access-Control-Expose-Headers', - 'X-Stream-Output, X-Chunked-Output, X-Content-Length') - this.server.start(cb) }) }, From 6c85b4ed7e3c96dc9693cf8192d8d6da70f9dbf5 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 4 Sep 2018 12:11:40 +0100 Subject: [PATCH 02/10] fix: updated header and end request handling --- src/http/api/routes/files.js | 12 ++++++------ src/http/index.js | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index 02d065d995..711d94cf26 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -116,11 +116,11 @@ module.exports = (server) => { handler: (request, reply) => { console.log('received') console.log(request.headers['content-range']) - console.log(request.headers['ipfs-chunk-id']) - console.log(request.headers['ipfs-chunk-name']) - const boundary = request.headers['ipfs-chunk-boundary'] - const id = request.headers['ipfs-chunk-name'] // change name to id - const index = Number(request.headers['ipfs-chunk-id']) + console.log(request.headers['x-ipfs-chunk-index']) + console.log(request.headers['x-ipfs-chunk-group-uuid']) + const boundary = request.headers['x-ipfs-chunk-boundary'] + const id = request.headers['x-ipfs-chunk-group-uuid'] // change name to id + const index = Number(request.headers['x-ipfs-chunk-index']) const file = path.join(filesDir, id) const match = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) const ipfs = request.server.app.ipfs @@ -160,7 +160,7 @@ module.exports = (server) => { // return; } - if (index === -1) { + if (start === total) { // check if size + payload.length === total /* all chunks have been received */ stream.on('finish', function () { diff --git a/src/http/index.js b/src/http/index.js index e0a286d22b..9fc4392ecd 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -105,7 +105,7 @@ function HttpApi (repo, config, cliArgs) { routes: { cors: { origin: ['*'], - additionalHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'Content-Type', 'Content-Range', 'IPFS-CHUNK-NAME', 'IPFS-CHUNK-ID', 'ipfs-chunk-boundary'], + additionalHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'Content-Type', 'Content-Range', 'x-ipfs-chunk-index', 'x-ipfs-chunk-group-uuid', 'x-ipfs-chunk-boundary'], additionalExposedHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length'] } } From 12342183289dec6b511007605e5364ea0fa192fc Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Wed, 5 Sep 2018 18:27:05 +0100 Subject: [PATCH 03/10] feat: add multipart response stream --- src/http/api/routes/files.js | 106 +++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 31 deletions(-) diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index 711d94cf26..910a211a3e 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -6,57 +6,107 @@ const fs = require('fs') const path = require('path') const tempy = require('tempy') const multipart = require('ipfs-multipart') -const pull = require('pull-stream') const toPull = require('stream-to-pull-stream') +const toStream = require('pull-stream-to-stream') +const pull = require('pull-stream') const pushable = require('pull-pushable') +const abortable = require('pull-abortable') +const { serialize } = require('pull-ndjson') const streams = [] const filesDir = tempy.directory() -const createMultipartStream = (readStream, boundary, ipfs, cb) => { +const responseError = (msg, code, request, abortStream) => { + const err = JSON.stringify({ Message: msg, Code: code }) + request.raw.res.addTrailers({ + 'X-Stream-Error': err + }) + abortStream.abort() +} +const createMultipartStream = (readStream, boundary, ipfs, request, reply, cb) => { + const fileAdder = pushable() const parser = new multipart.Parser({ boundary: boundary }) + let filesParsed = false + readStream.pipe(parser) - const fileAdder = pushable() parser.on('file', (fileName, fileStream) => { - fileName = decodeURIComponent(fileName) - - const filePair = { - path: fileName, + console.log('File: ', fileName) + filesParsed = true + fileAdder.push({ + path: decodeURIComponent(fileName), content: toPull(fileStream) - } - console.log(filePair) - fileAdder.push(filePair) + }) }) parser.on('directory', (directory) => { - directory = decodeURIComponent(directory) fileAdder.push({ - path: directory, + path: decodeURIComponent(directory), content: '' }) }) parser.on('end', () => { + console.log('multipart end') fileAdder.end() + if (!filesParsed) { + reply({ + Message: "File argument 'data' is required.", + Code: 0, + Type: 'error' + }).code(400).takeover() + } }) + const pushStream = pushable() + const abortStream = abortable() + const replyStream = toStream.source(pull( + pushStream, + abortStream, + serialize() + )) + + // Fix Hapi Error: Stream must have a streams2 readable interface + if (!replyStream._read) { + replyStream._read = () => {} + replyStream._readableState = {} + replyStream.unpipe = () => {} + } + + // setup reply + reply(replyStream) + .header('x-chunked-output', '1') + .header('content-type', 'application/json') + .header('Trailer', 'X-Stream-Error') + + const progressHandler = (bytes) => { + pushStream.push({ Bytes: bytes }) + } + // ipfs add options + const options = { + cidVersion: request.query['cid-version'], + rawLeaves: request.query['raw-leaves'], + progress: request.query.progress ? progressHandler : null, + onlyHash: request.query['only-hash'], + hashAlg: request.query.hash, + wrapWithDirectory: request.query['wrap-with-directory'], + pin: request.query.pin, + chunker: request.query.chunker + } + pull( fileAdder, - ipfs.files.addPullStream(), - pull.map((file) => { - return { - Name: file.path, // addPullStream already turned this into a hash if it wanted to - Hash: file.hash, - Size: file.size - } - }), + ipfs.files.addPullStream(options), pull.collect((err, files) => { if (err) { - cb(err) - return + return responseError(err.msg, 0, request) } - cb(null, files) + if (files.length === 0) { + return responseError('Failed to add files.', 0, request) + } + console.log(files) + files.forEach((f) => pushStream.push(f)) + pushStream.end() }) ) @@ -111,7 +161,7 @@ module.exports = (server) => { config: { payload: { parse: false, - maxBytes: 10048576 + maxBytes: 10485760 }, handler: (request, reply) => { console.log('received') @@ -166,13 +216,7 @@ module.exports = (server) => { stream.on('finish', function () { console.log('add to ipfs from the file') var readStream = fs.createReadStream(file) - createMultipartStream(readStream, boundary, ipfs, (err, files) => { - if (err) { - console.error(err) - } - console.log('finished adding to ipfs', files) - reply({files}) - }) + createMultipartStream(readStream, boundary, ipfs, request, reply) }) stream.end() From 95dfd2128c1601466f118dd3aba75a966ef8a81c Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Thu, 6 Sep 2018 14:59:53 +0100 Subject: [PATCH 04/10] fix: fix error handling and cleanup log --- src/http/api/resources/files.js | 6 ---- src/http/api/routes/files.js | 51 ++++++++++++--------------------- 2 files changed, 19 insertions(+), 38 deletions(-) diff --git a/src/http/api/resources/files.js b/src/http/api/resources/files.js index 13ea6cbd54..b263a8fa86 100644 --- a/src/http/api/resources/files.js +++ b/src/http/api/resources/files.js @@ -165,7 +165,6 @@ exports.add = { }, handler: (request, reply) => { - console.log('yoo') if (!request.payload) { return reply({ Message: 'Array, Buffer, or String is required.', @@ -184,10 +183,6 @@ exports.add = { parser.on('file', (fileName, fileStream) => { fileName = decodeURIComponent(fileName) - console.log('file', fileName) - fileStream.on('data', (d) => { - console.log(d.byteLength) - }) const filePair = { path: fileName, content: toPull(fileStream) @@ -198,7 +193,6 @@ exports.add = { parser.on('directory', (directory) => { directory = decodeURIComponent(directory) - console.log('directory', directory) fileAdder.push({ path: directory, content: '' diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index 910a211a3e..6cf0049c31 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -16,23 +16,14 @@ const { serialize } = require('pull-ndjson') const streams = [] const filesDir = tempy.directory() -const responseError = (msg, code, request, abortStream) => { - const err = JSON.stringify({ Message: msg, Code: code }) - request.raw.res.addTrailers({ - 'X-Stream-Error': err - }) - abortStream.abort() -} -const createMultipartStream = (readStream, boundary, ipfs, request, reply, cb) => { +const createMultipartReply = (readStream, boundary, ipfs, query, reply) => { const fileAdder = pushable() const parser = new multipart.Parser({ boundary: boundary }) - let filesParsed = false readStream.pipe(parser) parser.on('file', (fileName, fileStream) => { console.log('File: ', fileName) - filesParsed = true fileAdder.push({ path: decodeURIComponent(fileName), content: toPull(fileStream) @@ -49,15 +40,10 @@ const createMultipartStream = (readStream, boundary, ipfs, request, reply, cb) = parser.on('end', () => { console.log('multipart end') fileAdder.end() - if (!filesParsed) { - reply({ - Message: "File argument 'data' is required.", - Code: 0, - Type: 'error' - }).code(400).takeover() - } }) + // TODO: handle multipart errors + const pushStream = pushable() const abortStream = abortable() const replyStream = toStream.source(pull( @@ -77,21 +63,20 @@ const createMultipartStream = (readStream, boundary, ipfs, request, reply, cb) = reply(replyStream) .header('x-chunked-output', '1') .header('content-type', 'application/json') - .header('Trailer', 'X-Stream-Error') const progressHandler = (bytes) => { pushStream.push({ Bytes: bytes }) } // ipfs add options const options = { - cidVersion: request.query['cid-version'], - rawLeaves: request.query['raw-leaves'], - progress: request.query.progress ? progressHandler : null, - onlyHash: request.query['only-hash'], - hashAlg: request.query.hash, - wrapWithDirectory: request.query['wrap-with-directory'], - pin: request.query.pin, - chunker: request.query.chunker + cidVersion: query['cid-version'], + rawLeaves: query['raw-leaves'], + progress: query.progress ? progressHandler : null, + onlyHash: query['only-hash'], + hashAlg: query.hash, + wrapWithDirectory: query['wrap-with-directory'], + pin: query.pin, + chunker: query.chunker } pull( @@ -99,12 +84,14 @@ const createMultipartStream = (readStream, boundary, ipfs, request, reply, cb) = ipfs.files.addPullStream(options), pull.collect((err, files) => { if (err) { - return responseError(err.msg, 0, request) - } - if (files.length === 0) { - return responseError('Failed to add files.', 0, request) + pushStream.push({ + Message: err.toString(), + Code: 0, + Type: 'error' + }) + pushStream.end() + return } - console.log(files) files.forEach((f) => pushStream.push(f)) pushStream.end() }) @@ -216,7 +203,7 @@ module.exports = (server) => { stream.on('finish', function () { console.log('add to ipfs from the file') var readStream = fs.createReadStream(file) - createMultipartStream(readStream, boundary, ipfs, request, reply) + createMultipartReply(readStream, boundary, ipfs, request.query, reply) }) stream.end() From eee889b5ef6c4240156950fff59e032d59309a25 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Thu, 6 Sep 2018 17:57:52 +0100 Subject: [PATCH 05/10] feat: use multiple files and on sucess delete those --- package.json | 3 ++ src/http/api/routes/files.js | 97 +++++++++++++++++------------------- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/package.json b/package.json index 25e1bf338c..97dae23dc0 100644 --- a/package.json +++ b/package.json @@ -93,7 +93,9 @@ "byteman": "^1.3.5", "cids": "~0.5.3", "debug": "^3.1.0", + "del": "^3.0.0", "err-code": "^1.1.2", + "fast-glob": "^2.2.2", "file-type": "^8.1.0", "filesize": "^3.6.1", "fnv1a": "^1.0.1", @@ -167,6 +169,7 @@ "read-pkg-up": "^4.0.0", "readable-stream": "2.3.6", "receptacle": "^1.3.2", + "stream-concat": "^0.3.0", "stream-to-pull-stream": "^1.7.2", "tar-stream": "^1.6.1", "temp": "~0.8.3", diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index 6cf0049c31..f5896081d7 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -1,10 +1,12 @@ 'use strict' -const resources = require('./../resources') -const mfs = require('ipfs-mfs/http') const fs = require('fs') const path = require('path') const tempy = require('tempy') +const del = require('del') +const StreamConcat = require('stream-concat') +const boom = require('boom') +const glob = require('fast-glob') const multipart = require('ipfs-multipart') const toPull = require('stream-to-pull-stream') const toStream = require('pull-stream-to-stream') @@ -12,18 +14,18 @@ const pull = require('pull-stream') const pushable = require('pull-pushable') const abortable = require('pull-abortable') const { serialize } = require('pull-ndjson') +const mfs = require('ipfs-mfs/http') +const resources = require('./../resources') -const streams = [] const filesDir = tempy.directory() -const createMultipartReply = (readStream, boundary, ipfs, query, reply) => { +const createMultipartReply = (readStream, boundary, ipfs, query, reply, cb) => { const fileAdder = pushable() const parser = new multipart.Parser({ boundary: boundary }) readStream.pipe(parser) parser.on('file', (fileName, fileStream) => { - console.log('File: ', fileName) fileAdder.push({ path: decodeURIComponent(fileName), content: toPull(fileStream) @@ -38,7 +40,6 @@ const createMultipartReply = (readStream, boundary, ipfs, query, reply) => { }) parser.on('end', () => { - console.log('multipart end') fileAdder.end() }) @@ -94,10 +95,9 @@ const createMultipartReply = (readStream, boundary, ipfs, query, reply) => { } files.forEach((f) => pushStream.push(f)) pushStream.end() + cb() }) ) - - return parser } module.exports = (server) => { const api = server.select('API') @@ -142,7 +142,6 @@ module.exports = (server) => { }) api.route({ - // TODO fix method method: 'POST', path: '/api/v0/add-chunked', config: { @@ -151,65 +150,59 @@ module.exports = (server) => { maxBytes: 10485760 }, handler: (request, reply) => { - console.log('received') - console.log(request.headers['content-range']) - console.log(request.headers['x-ipfs-chunk-index']) - console.log(request.headers['x-ipfs-chunk-group-uuid']) + // console.log('received') + // console.log(request.headers['content-range']) + // console.log(request.headers['x-ipfs-chunk-index']) + // console.log(request.headers['x-ipfs-chunk-group-uuid']) const boundary = request.headers['x-ipfs-chunk-boundary'] - const id = request.headers['x-ipfs-chunk-group-uuid'] // change name to id + const id = request.headers['x-ipfs-chunk-group-uuid'] const index = Number(request.headers['x-ipfs-chunk-index']) - const file = path.join(filesDir, id) + const file = path.join(filesDir, id) + '-' + index const match = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) const ipfs = request.server.app.ipfs - // if (!match || !match[1] || !match[2] || !match[3]) { - /* malformed content-range header */ - // res.send('Bad Request', 400) - // return; - // } + + if (!match || !match[1] || !match[2] || !match[3]) { + return boom.badRequest('malformed content-range header') + } const start = parseInt(match[1]) const end = parseInt(match[2]) const total = parseInt(match[3]) - // console.log(start, end, total, index, boundary) - let stream = streams[id] - if (!stream) { - console.log('create new stream', file) - stream = fs.createWriteStream(file, {flags: 'a+'}) - streams[id] = stream - } - - console.log('stream', file) - let size = 0 - if (fs.existsSync(file)) { - size = fs.statSync(file).size - } - - if ((end + 1) === size) { - /* duplicate chunk */ - // res.send('Created', 201) - // return; - } - - if (start !== size) { - /* missing chunk */ - // res.send('Bad Request', 400) - // return; - } + // TODO validate duplicates, missing chunks if (start === total) { - // check if size + payload.length === total /* all chunks have been received */ - stream.on('finish', function () { - console.log('add to ipfs from the file') - var readStream = fs.createReadStream(file) - createMultipartReply(readStream, boundary, ipfs, request.query, reply) + const base = path.join(filesDir, id) + '-' + const pattern = base + '*' + const files = glob.sync(pattern) + + files.sort((a, b) => { + return Number(a.replace(base, '')) - Number(b.replace(base, '')) }) - stream.end() + let fileIndex = 0 + const nextStream = () => fileIndex === files.length ? null : fs.createReadStream(files[fileIndex++]) + createMultipartReply( + new StreamConcat(nextStream), + boundary, + ipfs, + request.query, + reply, + () => { + console.log('Finished adding') + del(pattern, { force: true }) + .then(paths => { + console.log('Deleted files and folders:\n', paths.join('\n')) + }) + .catch(console.error) + } + ) } else { + const stream = fs.createWriteStream(file) stream.write(request.payload) - /* this chunk has been processed successfully */ + + // TODO handle errors reply({ Bytes: request.payload.length }) } } From 0d4581a7695e54e88af5b35ddf402881f0d91f8b Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Fri, 7 Sep 2018 18:54:35 +0100 Subject: [PATCH 06/10] feat: handle non chunked --- src/http/api/routes/files.js | 45 ++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index f5896081d7..efcb8f4094 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -6,6 +6,7 @@ const tempy = require('tempy') const del = require('del') const StreamConcat = require('stream-concat') const boom = require('boom') +const pump = require('pump') const glob = require('fast-glob') const multipart = require('ipfs-multipart') const toPull = require('stream-to-pull-stream') @@ -21,8 +22,10 @@ const filesDir = tempy.directory() const createMultipartReply = (readStream, boundary, ipfs, query, reply, cb) => { const fileAdder = pushable() - const parser = new multipart.Parser({ boundary: boundary }) + let parser = null + // use the other multipart factory for non chunked to get the boundary + parser = new multipart.Parser({ boundary: boundary }) readStream.pipe(parser) parser.on('file', (fileName, fileStream) => { @@ -147,19 +150,38 @@ module.exports = (server) => { config: { payload: { parse: false, - maxBytes: 10485760 + output: 'stream', + maxBytes: 1000 * 1024 * 1024 + // maxBytes: 10485760 }, handler: (request, reply) => { // console.log('received') // console.log(request.headers['content-range']) // console.log(request.headers['x-ipfs-chunk-index']) // console.log(request.headers['x-ipfs-chunk-group-uuid']) - const boundary = request.headers['x-ipfs-chunk-boundary'] const id = request.headers['x-ipfs-chunk-group-uuid'] + const boundary = request.headers['x-ipfs-chunk-boundary'] + const ipfs = request.server.app.ipfs + + // non chunked + + if (!id) { + createMultipartReply( + request.payload, + boundary, + ipfs, + request.query, + reply, + () => { + console.log('Finished adding') + } + ) + + return + } const index = Number(request.headers['x-ipfs-chunk-index']) const file = path.join(filesDir, id) + '-' + index const match = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) - const ipfs = request.server.app.ipfs if (!match || !match[1] || !match[2] || !match[3]) { return boom.badRequest('malformed content-range header') @@ -199,11 +221,16 @@ module.exports = (server) => { } ) } else { - const stream = fs.createWriteStream(file) - stream.write(request.payload) - - // TODO handle errors - reply({ Bytes: request.payload.length }) + pump( + request.payload, + fs.createWriteStream(file), + (err) => { + if (err) { + reply(err) + } + reply({ Bytes: total }) + } + ) } } } From cd3d25ef400932f627f724d902c712481a7dd95f Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Mon, 10 Sep 2018 17:10:28 +0100 Subject: [PATCH 07/10] fix: new header and more error handling --- src/http/api/routes/files.js | 102 ++++++++++++++++++++--------------- src/http/index.js | 4 +- 2 files changed, 62 insertions(+), 44 deletions(-) diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index efcb8f4094..e5b1f62f12 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -8,7 +8,9 @@ const StreamConcat = require('stream-concat') const boom = require('boom') const pump = require('pump') const glob = require('fast-glob') -const multipart = require('ipfs-multipart') +const Joi = require('joi') +const content = require('content') +const { Parser } = require('ipfs-multipart') const toPull = require('stream-to-pull-stream') const toStream = require('pull-stream-to-stream') const pull = require('pull-stream') @@ -20,12 +22,24 @@ const resources = require('./../resources') const filesDir = tempy.directory() -const createMultipartReply = (readStream, boundary, ipfs, query, reply, cb) => { - const fileAdder = pushable() - let parser = null +const parseChunkedInput = (request) => { + const input = request.headers['x-chunked-input'] + const regex = /^uuid="([^"]+)";\s*index=(\d*)/i + + if (!input) { + return null + } + const match = input.match(regex) - // use the other multipart factory for non chunked to get the boundary - parser = new multipart.Parser({ boundary: boundary }) + return [match[1], Number(match[2])] +} + +const createMultipartReply = (readStream, request, reply, cb) => { + const fileAdder = pushable() + const boundary = content.type(request.headers['content-type']).boundary + const ipfs = request.server.app.ipfs + const query = request.query + const parser = new Parser({ boundary: boundary }) readStream.pipe(parser) parser.on('file', (fileName, fileStream) => { @@ -46,7 +60,7 @@ const createMultipartReply = (readStream, boundary, ipfs, query, reply, cb) => { fileAdder.end() }) - // TODO: handle multipart errors + parser.on('error', err => cb(err)) const pushStream = pushable() const abortStream = abortable() @@ -136,7 +150,7 @@ module.exports = (server) => { config: { payload: { parse: false, - output: 'stream', + // output: 'stream', maxBytes: 10048576 }, handler: resources.files.add.handler, @@ -154,48 +168,49 @@ module.exports = (server) => { maxBytes: 1000 * 1024 * 1024 // maxBytes: 10485760 }, + validate: { + headers: { + 'content-range': Joi.string().regex(/(\d+)-(\d+)\/(\d+|\*)/), + 'x-chunked-input': Joi.string().regex(/^uuid="([^"]+)";\s*index=(\d*)/i) + }, + options: { + allowUnknown: true + } + }, handler: (request, reply) => { - // console.log('received') - // console.log(request.headers['content-range']) - // console.log(request.headers['x-ipfs-chunk-index']) - // console.log(request.headers['x-ipfs-chunk-group-uuid']) - const id = request.headers['x-ipfs-chunk-group-uuid'] - const boundary = request.headers['x-ipfs-chunk-boundary'] - const ipfs = request.server.app.ipfs + const chunkedInput = parseChunkedInput(request) - // non chunked + if (boom.isBoom(chunkedInput)) { + return reply(chunkedInput) + } - if (!id) { + // non chunked + if (!chunkedInput) { createMultipartReply( request.payload, - boundary, - ipfs, - request.query, + request, reply, - () => { + (err) => { + if (err) { + return reply(err) + } console.log('Finished adding') } ) return } - const index = Number(request.headers['x-ipfs-chunk-index']) - const file = path.join(filesDir, id) + '-' + index - const match = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) - - if (!match || !match[1] || !match[2] || !match[3]) { - return boom.badRequest('malformed content-range header') - } - const start = parseInt(match[1]) - const end = parseInt(match[2]) - const total = parseInt(match[3]) + // chunked + const [uuid, index] = chunkedInput + const [, start, , total] = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) + const file = path.join(filesDir, uuid) + '-' + index // TODO validate duplicates, missing chunks if (start === total) { /* all chunks have been received */ - const base = path.join(filesDir, id) + '-' + const base = path.join(filesDir, uuid) + '-' const pattern = base + '*' const files = glob.sync(pattern) @@ -207,26 +222,29 @@ module.exports = (server) => { const nextStream = () => fileIndex === files.length ? null : fs.createReadStream(files[fileIndex++]) createMultipartReply( new StreamConcat(nextStream), - boundary, - ipfs, - request.query, + request, reply, - () => { + (err) => { + if (err) { + return reply(err) + } + console.log('Finished adding') - del(pattern, { force: true }) - .then(paths => { - console.log('Deleted files and folders:\n', paths.join('\n')) - }) - .catch(console.error) + // del(pattern, { force: true }) + // .then(paths => { + // console.log('Deleted files and folders:\n', paths.join('\n')) + // }) + // .catch(console.error) } ) } else { + console.log(file) pump( request.payload, fs.createWriteStream(file), (err) => { if (err) { - reply(err) + return reply(err) } reply({ Bytes: total }) } diff --git a/src/http/index.js b/src/http/index.js index 9fc4392ecd..8a9336cf09 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -105,8 +105,8 @@ function HttpApi (repo, config, cliArgs) { routes: { cors: { origin: ['*'], - additionalHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'Content-Type', 'Content-Range', 'x-ipfs-chunk-index', 'x-ipfs-chunk-group-uuid', 'x-ipfs-chunk-boundary'], - additionalExposedHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length'] + additionalHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'Content-Type', 'Content-Range', 'X-Chunked-Input'], + additionalExposedHeaders: ['X-Stream-Output, X-Chunked-Output, X-Content-Length', 'X-Chunked-Input'] } } }, From 405a7e6d2234fa5d967dbb90afb0041e97910791 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Mon, 10 Sep 2018 17:11:02 +0100 Subject: [PATCH 08/10] fix: remove log --- src/http/api/routes/files.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index e5b1f62f12..5781f5426b 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -238,7 +238,6 @@ module.exports = (server) => { } ) } else { - console.log(file) pump( request.payload, fs.createWriteStream(file), From e5c1d80f8ed311eecc35f86842be43b29d3369c2 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Wed, 12 Sep 2018 11:58:07 +0100 Subject: [PATCH 09/10] chore: cleanup --- package.json | 1 - src/http/api/routes/files.js | 10 +++++----- src/http/index.js | 1 - 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index 97dae23dc0..a3437153ea 100644 --- a/package.json +++ b/package.json @@ -103,7 +103,6 @@ "get-folder-size": "^2.0.0", "glob": "^7.1.2", "hapi": "^16.6.2", - "hapi-set-header": "^1.0.2", "hoek": "^5.0.3", "human-to-milliseconds": "^1.0.0", "interface-datastore": "~0.4.2", diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index 5781f5426b..e33b336ec8 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -230,11 +230,11 @@ module.exports = (server) => { } console.log('Finished adding') - // del(pattern, { force: true }) - // .then(paths => { - // console.log('Deleted files and folders:\n', paths.join('\n')) - // }) - // .catch(console.error) + del(pattern, { force: true }) + .then(paths => { + console.log('Deleted files and folders:\n', paths.join('\n')) + }) + .catch(console.error) } ) } else { diff --git a/src/http/index.js b/src/http/index.js index 8a9336cf09..706365e717 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -4,7 +4,6 @@ const series = require('async/series') const Hapi = require('hapi') const debug = require('debug') const multiaddr = require('multiaddr') -const setHeader = require('hapi-set-header') const once = require('once') const IPFS = require('../core') From a0d731b542c808b530c7bcfb616ae35298f7d3db Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Mon, 17 Sep 2018 13:47:15 +0100 Subject: [PATCH 10/10] fix: cleanup, file restructure, reply stream handling --- src/core/components/files.js | 3 +- src/http/api/resources/files.js | 85 +++++++++++ src/http/api/routes/files.js | 204 +------------------------ src/http/api/utils/add-experimental.js | 146 ++++++++++++++++++ 4 files changed, 238 insertions(+), 200 deletions(-) create mode 100644 src/http/api/utils/add-experimental.js diff --git a/src/core/components/files.js b/src/core/components/files.js index d575d893fa..6d7122e8a7 100644 --- a/src/core/components/files.js +++ b/src/core/components/files.js @@ -13,7 +13,7 @@ const deferred = require('pull-defer') const waterfall = require('async/waterfall') const isStream = require('is-stream') const isSource = require('is-pull-stream').isSource -const Duplex = require('readable-stream').Duplex +const Duplex = require('stream').Duplex const OtherBuffer = require('buffer').Buffer const CID = require('cids') const toB58String = require('multihashes').toB58String @@ -136,6 +136,7 @@ class AddHelper extends Duplex { if (end instanceof Error) { this.emit('error', end) } + this.push(null) } else { this.push(data) } diff --git a/src/http/api/resources/files.js b/src/http/api/resources/files.js index b263a8fa86..64d0a06afe 100644 --- a/src/http/api/resources/files.js +++ b/src/http/api/resources/files.js @@ -1,5 +1,7 @@ 'use strict' +const path = require('path') +const fs = require('fs') const mh = require('multihashes') const multipart = require('ipfs-multipart') const debug = require('debug') @@ -10,10 +12,21 @@ const pull = require('pull-stream') const toPull = require('stream-to-pull-stream') const pushable = require('pull-pushable') const each = require('async/each') +const content = require('content') const toStream = require('pull-stream-to-stream') const abortable = require('pull-abortable') const Joi = require('joi') +const pump = require('pump') +const tempy = require('tempy') const ndjson = require('pull-ndjson') +const { + parseChunkedInput, + createMultipartReply, + matchMultipartEnd, + processAndAdd +} = require('../utils/add-experimental') + +const filesDir = tempy.directory() exports = module.exports @@ -282,6 +295,78 @@ exports.add = { } } +exports.addExperimental = { + validate: { + query: Joi.object() + .keys({ + 'cid-version': Joi.number().integer().min(0).max(1).default(0), + 'raw-leaves': Joi.boolean(), + 'only-hash': Joi.boolean(), + pin: Joi.boolean().default(true), + 'wrap-with-directory': Joi.boolean(), + chunker: Joi.string() + }) + // TODO: Necessary until validate "recursive", "stream-channels" etc. + .options({ allowUnknown: true }), + headers: { + 'content-range': Joi.string().regex(/(\d+)-(\d+)\/(\d+|\*)/), + 'x-chunked-input': Joi.string().regex(/^uuid="([^"]+)";\s*index=(\d*)/i) + }, + options: { + allowUnknown: true + } + }, + + handler: (request, reply) => { + const chunkedInput = parseChunkedInput(request) + + // non chunked + if (!chunkedInput) { + createMultipartReply( + request.payload, + request, + reply, + (err) => { + if (err) { + return reply(err) + } + } + ) + + return + } + + // chunked + const [uuid, index] = chunkedInput + const [, , , total] = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) + const file = path.join(filesDir, uuid) + '-' + index + + // TODO validate duplicates, missing chunks when resumeable and concurrent request are supported + + pump( + request.payload, + fs.createWriteStream(file), + (err) => { + if (err) { + return reply(err) + } + const boundary = content.type(request.headers['content-type']).boundary + matchMultipartEnd(file, boundary, (err, isEnd) => { + if (err) { + return reply(err) + } + + if (isEnd) { + processAndAdd(uuid, filesDir, request, reply) + } else { + reply({ Bytes: total }) + } + }) + } + ) + } +} + exports.immutableLs = { // uses common parseKey method that returns a `key` parseArgs: exports.parseKey, diff --git a/src/http/api/routes/files.js b/src/http/api/routes/files.js index e33b336ec8..0dd95af105 100644 --- a/src/http/api/routes/files.js +++ b/src/http/api/routes/files.js @@ -1,121 +1,8 @@ 'use strict' -const fs = require('fs') -const path = require('path') -const tempy = require('tempy') -const del = require('del') -const StreamConcat = require('stream-concat') -const boom = require('boom') -const pump = require('pump') -const glob = require('fast-glob') -const Joi = require('joi') -const content = require('content') -const { Parser } = require('ipfs-multipart') -const toPull = require('stream-to-pull-stream') -const toStream = require('pull-stream-to-stream') -const pull = require('pull-stream') -const pushable = require('pull-pushable') -const abortable = require('pull-abortable') -const { serialize } = require('pull-ndjson') const mfs = require('ipfs-mfs/http') const resources = require('./../resources') -const filesDir = tempy.directory() - -const parseChunkedInput = (request) => { - const input = request.headers['x-chunked-input'] - const regex = /^uuid="([^"]+)";\s*index=(\d*)/i - - if (!input) { - return null - } - const match = input.match(regex) - - return [match[1], Number(match[2])] -} - -const createMultipartReply = (readStream, request, reply, cb) => { - const fileAdder = pushable() - const boundary = content.type(request.headers['content-type']).boundary - const ipfs = request.server.app.ipfs - const query = request.query - const parser = new Parser({ boundary: boundary }) - readStream.pipe(parser) - - parser.on('file', (fileName, fileStream) => { - fileAdder.push({ - path: decodeURIComponent(fileName), - content: toPull(fileStream) - }) - }) - - parser.on('directory', (directory) => { - fileAdder.push({ - path: decodeURIComponent(directory), - content: '' - }) - }) - - parser.on('end', () => { - fileAdder.end() - }) - - parser.on('error', err => cb(err)) - - const pushStream = pushable() - const abortStream = abortable() - const replyStream = toStream.source(pull( - pushStream, - abortStream, - serialize() - )) - - // Fix Hapi Error: Stream must have a streams2 readable interface - if (!replyStream._read) { - replyStream._read = () => {} - replyStream._readableState = {} - replyStream.unpipe = () => {} - } - - // setup reply - reply(replyStream) - .header('x-chunked-output', '1') - .header('content-type', 'application/json') - - const progressHandler = (bytes) => { - pushStream.push({ Bytes: bytes }) - } - // ipfs add options - const options = { - cidVersion: query['cid-version'], - rawLeaves: query['raw-leaves'], - progress: query.progress ? progressHandler : null, - onlyHash: query['only-hash'], - hashAlg: query.hash, - wrapWithDirectory: query['wrap-with-directory'], - pin: query.pin, - chunker: query.chunker - } - - pull( - fileAdder, - ipfs.files.addPullStream(options), - pull.collect((err, files) => { - if (err) { - pushStream.push({ - Message: err.toString(), - Code: 0, - Type: 'error' - }) - pushStream.end() - return - } - files.forEach((f) => pushStream.push(f)) - pushStream.end() - cb() - }) - ) -} module.exports = (server) => { const api = server.select('API') @@ -150,8 +37,8 @@ module.exports = (server) => { config: { payload: { parse: false, - // output: 'stream', - maxBytes: 10048576 + output: 'stream', + maxBytes: 1000 * 1024 * 1024 }, handler: resources.files.add.handler, validate: resources.files.add.validate @@ -160,96 +47,15 @@ module.exports = (server) => { api.route({ method: 'POST', - path: '/api/v0/add-chunked', + path: '/api/v0/add-experimental', config: { payload: { parse: false, output: 'stream', maxBytes: 1000 * 1024 * 1024 - // maxBytes: 10485760 - }, - validate: { - headers: { - 'content-range': Joi.string().regex(/(\d+)-(\d+)\/(\d+|\*)/), - 'x-chunked-input': Joi.string().regex(/^uuid="([^"]+)";\s*index=(\d*)/i) - }, - options: { - allowUnknown: true - } }, - handler: (request, reply) => { - const chunkedInput = parseChunkedInput(request) - - if (boom.isBoom(chunkedInput)) { - return reply(chunkedInput) - } - - // non chunked - if (!chunkedInput) { - createMultipartReply( - request.payload, - request, - reply, - (err) => { - if (err) { - return reply(err) - } - console.log('Finished adding') - } - ) - - return - } - - // chunked - const [uuid, index] = chunkedInput - const [, start, , total] = request.headers['content-range'].match(/(\d+)-(\d+)\/(\d+|\*)/) - const file = path.join(filesDir, uuid) + '-' + index - - // TODO validate duplicates, missing chunks - - if (start === total) { - /* all chunks have been received */ - const base = path.join(filesDir, uuid) + '-' - const pattern = base + '*' - const files = glob.sync(pattern) - - files.sort((a, b) => { - return Number(a.replace(base, '')) - Number(b.replace(base, '')) - }) - - let fileIndex = 0 - const nextStream = () => fileIndex === files.length ? null : fs.createReadStream(files[fileIndex++]) - createMultipartReply( - new StreamConcat(nextStream), - request, - reply, - (err) => { - if (err) { - return reply(err) - } - - console.log('Finished adding') - del(pattern, { force: true }) - .then(paths => { - console.log('Deleted files and folders:\n', paths.join('\n')) - }) - .catch(console.error) - } - ) - } else { - pump( - request.payload, - fs.createWriteStream(file), - (err) => { - if (err) { - return reply(err) - } - reply({ Bytes: total }) - } - ) - } - } + validate: resources.files.addExperimental.validate, + handler: resources.files.addExperimental.handler } }) diff --git a/src/http/api/utils/add-experimental.js b/src/http/api/utils/add-experimental.js new file mode 100644 index 0000000000..53df513c67 --- /dev/null +++ b/src/http/api/utils/add-experimental.js @@ -0,0 +1,146 @@ +'use strict' + +const fs = require('fs') +const path = require('path') +const { EOL } = require('os') +const { Readable } = require('stream') +const glob = require('fast-glob') +const StreamConcat = require('stream-concat') +const del = require('del') +const content = require('content') +const { Parser } = require('ipfs-multipart') + +const processAndAdd = (uuid, filesDir, request, reply) => { + // all chunks have been received + // TODO : here we have full size we can calculate the number of chunks to validate we have all the bytes + const base = path.join(filesDir, uuid) + '-' + const pattern = base + '*' + const files = glob.sync(pattern) + + files.sort((a, b) => { + return Number(a.replace(base, '')) - Number(b.replace(base, '')) + }) + + let fileIndex = 0 + const nextStream = () => fileIndex === files.length + ? null + : fs.createReadStream(files[fileIndex++]) + + createMultipartReply( + new StreamConcat(nextStream), + request, + reply, + (err) => { + if (err) { + return reply(err) + } + del(pattern, { force: true }) + .then(paths => { + console.log('Deleted files and folders:\n', paths.join('\n')) + }) + .catch(console.error) + } + ) +} + +const matchMultipartEnd = (file, boundary, cb) => { + const buffer = Buffer.alloc(56) + const fs = require('fs') + fs.open(file, 'r', (err, fd) => { + if (err) { + cb(err) + } + + fs.fstat(fd, (err, stats) => { + if (err) { + cb(err) + } + + fs.read(fd, buffer, 0, buffer.length, stats.size - 58, function (e, l, b) { + cb(null, b.toString().includes(boundary)) + }) + fs.close(fd) + }) + }) +} + +const parseChunkedInput = (request) => { + const input = request.headers['x-chunked-input'] + const regex = /^uuid="([^"]+)";\s*index=(\d*)/i + + if (!input) { + return null + } + const match = input.match(regex) + + return [match[1], Number(match[2])] +} + +const createMultipartReply = (readStream, request, reply, cb) => { + const boundary = content.type(request.headers['content-type']).boundary + const ipfs = request.server.app.ipfs + const query = request.query + const parser = new Parser({ boundary: boundary }) + const replyStream = new Readable({ read: () => {} }) + const serialize = d => JSON.stringify(d) + EOL + const progressHandler = (bytes) => { + replyStream.push(serialize({ Bytes: bytes })) + } + // ipfs add options + const options = { + cidVersion: query['cid-version'], + rawLeaves: query['raw-leaves'], + progress: query.progress ? progressHandler : null, + onlyHash: query['only-hash'], + hashAlg: query.hash, + wrapWithDirectory: query['wrap-with-directory'], + pin: query.pin, + chunker: query.chunker + } + const addStream = ipfs.files.addReadableStream(options) + + // Setup add pipeline + addStream.on('data', file => { + replyStream.push(serialize({ + Name: file.path, + Hash: file.hash, + Size: file.size + })) + }) + addStream.on('end', () => replyStream.push(null)) + addStream.on('error', cb) + + // Setup multipart parser + parser.on('file', (fileName, fileStream) => { + addStream.write({ + path: decodeURIComponent(fileName), + content: fileStream + }) + }) + parser.on('directory', (directory) => { + addStream.write({ + path: decodeURIComponent(directory), + content: '' + }) + }) + parser.on('end', () => { + addStream.end() + }) + parser.on('error', cb) + + // Send replyStream to reply + reply(replyStream) + .header('x-chunked-output', '1') + .header('content-encoding', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975 + .header('content-type', 'application/json') + + // start piping data to multipart parser + readStream.pipe(parser) +} + +module.exports = { + processAndAdd, + matchMultipartEnd, + parseChunkedInput, + createMultipartReply +}