From 4c320e8285dab06beeac4e20ed0af762669a5aff Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 24 Jan 2019 15:59:33 +0000 Subject: [PATCH 01/49] refactor: gateway for hapi 18 License: MIT Signed-off-by: Alan Shaw --- package.json | 3 +- src/cli/commands/daemon.js | 29 +- src/http/gateway/resources/gateway.js | 214 +++++++-------- src/http/gateway/routes/gateway.js | 26 +- src/http/gateway/routes/index.js | 4 +- src/http/index.js | 315 +++++++++++----------- test/gateway/index.js | 367 +++++++++++--------------- 7 files changed, 433 insertions(+), 525 deletions(-) diff --git a/package.json b/package.json index 02134a2e0c..7b26801a04 100644 --- a/package.json +++ b/package.json @@ -100,7 +100,8 @@ "fsm-event": "^2.1.0", "get-folder-size": "^2.0.0", "glob": "^7.1.3", - "hapi": "^16.6.2", + "hapi": "^18.0.0", + "hapi-pino": "^5.2.0", "hapi-set-header": "^1.0.2", "hoek": "^6.1.2", "human-to-milliseconds": "^1.0.0", diff --git a/src/cli/commands/daemon.js b/src/cli/commands/daemon.js index 71024150de..d14dc07bb8 100644 --- a/src/cli/commands/daemon.js +++ b/src/cli/commands/daemon.js @@ -1,10 +1,6 @@ 'use strict' -const promisify = require('promisify-es6') -const utils = require('../utils') -const print = utils.print - -let httpAPI +const { getRepoPath, print, ipfsPathHelp } = require('../utils') module.exports = { command: 'daemon', @@ -13,7 +9,7 @@ module.exports = { builder (yargs) { return yargs - .epilog(utils.ipfsPathHelp) + .epilog(ipfsPathHelp) .option('enable-sharding-experiment', { type: 'boolean', default: false @@ -40,14 +36,25 @@ module.exports = { argv.resolve((async () => { print('Initializing IPFS daemon...') - const repoPath = utils.getRepoPath() + const repoPath = getRepoPath() // Required inline to reduce startup time - const HttpAPI = require('../../http') - httpAPI = new HttpAPI(process.env.IPFS_PATH, null, argv) + const HttpApi = require('../../http') + const api = new HttpApi({ + silent: argv.silent, + repo: process.env.IPFS_PATH, + offline: argv.offline, + pass: argv.pass, + EXPERIMENTAL: { + pubsub: argv.enablePubsubExperiment, + ipnsPubsub: argv.enableNamesysPubsub, + dht: argv.enableDhtExperiment, + sharding: argv.enableShardingExperiment + } + }) try { - await promisify(httpAPI.start)() + await api.start() } catch (err) { if (err.code === 'ENOENT' && err.message.match(/uninitialized/i)) { print('Error: no initialized ipfs repo found in ' + repoPath) @@ -61,7 +68,7 @@ module.exports = { const cleanup = async () => { print(`Received interrupt signal, shutting down..`) - await promisify(httpAPI.stop)() + await api.stop() process.exit(0) } diff --git a/src/http/gateway/resources/gateway.js b/src/http/gateway/resources/gateway.js index 4f6cc67b8c..805b5bb620 100644 --- a/src/http/gateway/resources/gateway.js +++ b/src/http/gateway/resources/gateway.js @@ -4,14 +4,15 @@ const debug = require('debug') const log = debug('jsipfs:http-gateway') log.error = debug('jsipfs:http-gateway:error') const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') +const pushable = require('pull-pushable') +const toStream = require('pull-stream-to-stream') const fileType = require('file-type') const mime = require('mime-types') -const Stream = require('readable-stream') -const CID = require('cids') +const { PassThrough } = require('readable-stream') const { resolver } = require('ipfs-http-response') const PathUtils = require('../utils/path') +const { cidToString } = require('../../../utils/cid') function detectContentType (ref, chunk) { let fileSignature @@ -29,151 +30,140 @@ function detectContentType (ref, chunk) { } module.exports = { - checkCID: (request, reply) => { + checkCID (request, h) { if (!request.params.cid) { - return reply({ + return h.response({ Message: 'Path Resolve error: path must contain at least one component', Code: 0, Type: 'error' }).code(400).takeover() } - return reply({ - ref: `/ipfs/${request.params.cid}` - }) + return { ref: `/ipfs/${request.params.cid}` } }, - handler: (request, reply) => { - const ref = request.pre.args.ref - const ipfs = request.server.app.ipfs - - function handleGatewayResolverError (err) { - if (err) { - log.error('err: ', err.toString(), ' fileName: ', err.fileName) - - const errorToString = err.toString() - // switch case with true feels so wrong. - switch (true) { - case (errorToString === 'Error: This dag node is a directory'): - resolver.directory(ipfs, ref, err.fileName, (err, data) => { - if (err) { - log.error(err) - return reply(err.toString()).code(500) - } - if (typeof data === 'string') { - // no index file found - if (!ref.endsWith('/')) { - // for a directory, if URL doesn't end with a / - // append / and redirect permanent to that URL - return reply.redirect(`${ref}/`).permanent(true) - } else { - // send directory listing - return reply(data) - } - } else { - // found index file - // redirect to URL/ - return reply.redirect(PathUtils.joinURLParts(ref, data[0].name)) - } - }) - break - case (errorToString.startsWith('Error: no link named')): - return reply(errorToString).code(404) - case (errorToString.startsWith('Error: multihash length inconsistent')): - case (errorToString.startsWith('Error: Non-base58 character')): - return reply({ Message: errorToString, Code: 0, Type: 'error' }).code(400) - default: + async handler (request, h) { + const { ref } = request.pre.args + const { ipfs } = request.server.app + + let data + try { + data = await resolver.cid(ipfs, ref) + } catch (err) { + const errorToString = err.toString() + log.error('err: ', errorToString, ' fileName: ', err.fileName) + + // switch case with true feels so wrong. + switch (true) { + case (errorToString === 'Error: This dag node is a directory'): + try { + data = await resolver.directory(ipfs, ref, err.cid) + } catch (err) { log.error(err) - return reply({ Message: errorToString, Code: 0, Type: 'error' }).code(500) - } + return h.response(err.toString()).code(500) + } + + if (typeof data === 'string') { + // no index file found + if (!ref.endsWith('/')) { + // for a directory, if URL doesn't end with a / + // append / and redirect permanent to that URL + return h.redirect(`${ref}/`).permanent(true) + } + // send directory listing + return h.response(data) + } + + // found index file + // redirect to URL/ + return h.redirect(PathUtils.joinURLParts(ref, data[0].name)) + case (errorToString.startsWith('Error: no link named')): + return h.response(errorToString).code(404) + case (errorToString.startsWith('Error: multihash length inconsistent')): + case (errorToString.startsWith('Error: Non-base58 character')): + return h.response({ Message: errorToString, Code: 0, Type: 'error' }).code(400) + default: + log.error(err) + return h.response({ Message: errorToString, Code: 0, Type: 'error' }).code(500) } } - return resolver.multihash(ipfs, ref, (err, data) => { - if (err) { - return handleGatewayResolverError(err) - } + if (ref.endsWith('/')) { + // remove trailing slash for files + return h.redirect(PathUtils.removeTrailingSlash(ref)).permanent(true) + } - const stream = ipfs.catReadableStream(data.multihash) - stream.once('error', (err) => { - if (err) { - log.error(err) - return reply(err.toString()).code(500) - } - }) - - if (ref.endsWith('/')) { - // remove trailing slash for files - return reply - .redirect(PathUtils.removeTrailingSlash(ref)) - .permanent(true) - } else { - if (!stream._read) { - stream._read = () => {} - stream._readableState = {} - } - - // response.continue() - let contentTypeDetected = false - let stream2 = new Stream.PassThrough({ highWaterMark: 1 }) - stream2.on('error', (err) => { - log.error('stream2 err: ', err) - }) - - let response = reply(stream2).hold() - - // Etag maps directly to an identifier for a specific version of a resource - // TODO: change to .cid.toBaseEncodedString() after switch to new js-ipfs-http-response - response.header('Etag', `"${data.multihash}"`) - - // Set headers specific to the immutable namespace - if (ref.startsWith('/ipfs/')) { - response.header('Cache-Control', 'public, max-age=29030400, immutable') - } - - pull( - toPull.source(stream), - pull.through((chunk) => { - // Guess content-type (only once) - if (chunk.length > 0 && !contentTypeDetected) { - let contentType = detectContentType(ref, chunk) - contentTypeDetected = true + return new Promise((resolve, reject) => { + let pusher + let started = false + + pull( + ipfs.catPullStream(data.cid), + pull.drain( + chunk => { + if (!started) { + started = true + pusher = pushable() + const res = h.response(toStream.source(pusher).pipe(new PassThrough())) + + // Etag maps directly to an identifier for a specific version of a resource + res.header('Etag', `"${data.cid}"`) + + // Set headers specific to the immutable namespace + if (ref.startsWith('/ipfs/')) { + res.header('Cache-Control', 'public, max-age=29030400, immutable') + } + + const contentType = detectContentType(ref, chunk) log('ref ', ref) log('mime-type ', contentType) if (contentType) { log('writing content-type header') - response.header('Content-Type', contentType) + res.header('Content-Type', contentType) } - response.send() + resolve(res) } + pusher.push(chunk) + }, + err => { + if (err) { + log.error(err) + + // We already started flowing, abort the stream + if (started) { + return pusher.end(err) + } - stream2.write(chunk) - }), - pull.onEnd(() => { - log('stream ended.') - stream2.end() - }) + return resolve(h.response({ + Message: err.message, + Code: 0, + Type: 'error' + }).code(500).takeover()) + } + + pusher.end() + } ) - } + ) }) }, - afterHandler: (request, reply) => { - const response = request.response + afterHandler (request, h) { + const { response } = request if (response.statusCode === 200) { - const ref = request.pre.args.ref + const { ref } = request.pre.args response.header('X-Ipfs-Path', ref) if (ref.startsWith('/ipfs/')) { const rootCid = ref.split('/')[2] - const ipfsOrigin = new CID(rootCid).toV1().toBaseEncodedString('base32') + const ipfsOrigin = cidToString(rootCid, { base: 'base32' }) response.header('Suborigin', 'ipfs000' + ipfsOrigin) } // TODO: we don't have case-insensitive solution for /ipns/ yet (https://github.com/ipfs/go-ipfs/issues/5287) } - reply.continue() + return response } } diff --git a/src/http/gateway/routes/gateway.js b/src/http/gateway/routes/gateway.js index 810a520437..eb8543d465 100644 --- a/src/http/gateway/routes/gateway.js +++ b/src/http/gateway/routes/gateway.js @@ -2,20 +2,16 @@ const resources = require('../resources') -module.exports = (server) => { - const gateway = server.select('Gateway') - - gateway.route({ - method: '*', - path: '/ipfs/{cid*}', - config: { - pre: [ - { method: resources.gateway.checkCID, assign: 'args' } - ], - handler: resources.gateway.handler, - ext: { - onPostHandler: { method: resources.gateway.afterHandler } - } +module.exports = { + method: '*', + path: '/ipfs/{cid*}', + options: { + handler: resources.gateway.handler, + pre: [ + { method: resources.gateway.checkCID, assign: 'args' } + ], + ext: { + onPostHandler: { method: resources.gateway.afterHandler } } - }) + } } diff --git a/src/http/gateway/routes/index.js b/src/http/gateway/routes/index.js index 0e0656c258..2cbf163b04 100644 --- a/src/http/gateway/routes/index.js +++ b/src/http/gateway/routes/index.js @@ -1,5 +1,3 @@ 'use strict' -module.exports = (server) => { - require('./gateway')(server) -} +module.exports = [require('./gateway')] diff --git a/src/http/index.js b/src/http/index.js index f35b05d075..aa28870323 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -1,11 +1,11 @@ 'use strict' -const series = require('async/series') const Hapi = require('hapi') +const Pino = require('hapi-pino') const debug = require('debug') const multiaddr = require('multiaddr') -const setHeader = require('hapi-set-header') -const once = require('once') +// const setHeader = require('hapi-set-header') +const promisify = require('promisify-es6') const IPFS = require('../core') const WStar = require('libp2p-webrtc-star') @@ -13,184 +13,167 @@ const TCP = require('libp2p-tcp') const MulticastDNS = require('libp2p-mdns') const WS = require('libp2p-websockets') const Bootstrap = require('libp2p-bootstrap') -const errorHandler = require('./error-handler') +// const errorHandler = require('./error-handler') function uriToMultiaddr (uri) { const ipPort = uri.split('/')[2].split(':') return `/ip4/${ipPort[0]}/tcp/${ipPort[1]}` } -function HttpApi (repo, config, cliArgs) { - cliArgs = cliArgs || {} - this.node = undefined - this.server = undefined - - this.log = debug('jsipfs:http-api') - this.log.error = debug('jsipfs:http-api:error') - - if (process.env.IPFS_MONITORING) { - // Setup debug metrics collection - const prometheusClient = require('prom-client') - const prometheusGcStats = require('prometheus-gc-stats') - const collectDefaultMetrics = prometheusClient.collectDefaultMetrics - collectDefaultMetrics({ timeout: 5000 }) - prometheusGcStats(prometheusClient.register)() +class HttpApi { + constructor (options) { + this._options = options || {} + this._log = debug('jsipfs:http-api') + this._log.error = debug('jsipfs:http-api:error') + + if (process.env.IPFS_MONITORING) { + // Setup debug metrics collection + const prometheusClient = require('prom-client') + const prometheusGcStats = require('prometheus-gc-stats') + const collectDefaultMetrics = prometheusClient.collectDefaultMetrics + collectDefaultMetrics({ timeout: 5000 }) + prometheusGcStats(prometheusClient.register)() + } } - this.start = (init, callback) => { - if (typeof init === 'function') { - callback = init - init = false + async start () { + this._log('starting') + + const libp2p = { modules: {} } + + // Attempt to use any of the WebRTC versions available globally + let electronWebRTC + let wrtc + try { + electronWebRTC = require('electron-webrtc')() + } catch (err) { + this._log('failed to load optional electron-webrtc dependency') + } + try { + wrtc = require('wrtc') + } catch (err) { + this._log('failed to load optional webrtc dependency') } - this.log('starting') - - series([ - (cb) => { - cb = once(cb) - - const libp2p = { modules: {} } - - // Attempt to use any of the WebRTC versions available globally - let electronWebRTC - let wrtc - try { - electronWebRTC = require('electron-webrtc')() - } catch (err) { - this.log('failed to load optional electron-webrtc dependency') - } - try { - wrtc = require('wrtc') - } catch (err) { - this.log('failed to load optional webrtc dependency') - } - - if (wrtc || electronWebRTC) { - const using = wrtc ? 'wrtc' : 'electron-webrtc' - this.log(`Using ${using} for webrtc support`) - const wstar = new WStar({ wrtc: (wrtc || electronWebRTC) }) - libp2p.modules.transport = [TCP, WS, wstar] - libp2p.modules.peerDiscovery = [MulticastDNS, Bootstrap, wstar.discovery] - } - - // try-catch so that programmer errors are not swallowed during testing - try { - // start the daemon - this.node = new IPFS({ - silent: cliArgs.silent, - repo: repo, - init: init, - start: true, - config: config, - offline: cliArgs.offline, - pass: cliArgs.pass, - EXPERIMENTAL: { - pubsub: cliArgs.enablePubsubExperiment, - ipnsPubsub: cliArgs.enableNamesysPubsub, - dht: cliArgs.enableDhtExperiment, - sharding: cliArgs.enableShardingExperiment - }, - libp2p: libp2p - }) - } catch (err) { - return cb(err) - } - - this.node.once('error', (err) => { - this.log('error starting core', err) - err.code = 'ENOENT' - cb(err) - }) - this.node.once('start', cb) - }, - (cb) => { - this.log('fetching config') - this.node._repo.config.get((err, config) => { - if (err) { - return callback(err) - } - - // CORS is enabled by default - // TODO: shouldn't, fix this - this.server = new Hapi.Server({ - connections: { - routes: { - cors: true - } - }, - debug: process.env.DEBUG ? { - request: ['*'], - log: ['*'] - } : undefined - }) - - this.server.app.ipfs = this.node - const api = config.Addresses.API.split('/') - const gateway = config.Addresses.Gateway.split('/') - - // select which connection with server.select(