Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Set the FileStreamConverter explicitly #701

Merged
merged 2 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/files/add-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
'use strict'

const SendFilesStream = require('../utils/send-files-stream')
const FileResultStreamConverter = require('../utils/file-result-stream-converter')
const toPull = require('stream-to-pull-stream')

module.exports = (send) => (options) => toPull(SendFilesStream(send, 'add')(options))
module.exports = (send) => {
return (options) => {
options = options || {}
options.converter = FileResultStreamConverter
return toPull(SendFilesStream(send, 'add')(options))
}
}
9 changes: 8 additions & 1 deletion src/files/add-readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
'use strict'

const SendFilesStream = require('../utils/send-files-stream')
const FileResultStreamConverter = require('../utils/file-result-stream-converter')

module.exports = (send) => SendFilesStream(send, 'add')
module.exports = (send) => {
return (options) => {
options = options || {}
options.converter = FileResultStreamConverter
return SendFilesStream(send, 'add')(options)
}
}
2 changes: 2 additions & 0 deletions src/files/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const ConcatStream = require('concat-stream')
const once = require('once')
const isStream = require('is-stream')
const OtherBuffer = require('buffer').Buffer
const FileResultStreamConverter = require('../utils/file-result-stream-converter')
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
Expand All @@ -21,6 +22,7 @@ module.exports = (send) => {
if (!options) {
options = {}
}
options.converter = FileResultStreamConverter

const ok = Buffer.isBuffer(_files) ||
isStream.readable(_files) ||
Expand Down
4 changes: 3 additions & 1 deletion src/files/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const promisify = require('promisify-es6')
const concatStream = require('concat-stream')
const once = require('once')
const FileResultStreamConverter = require('../utils/file-result-stream-converter')
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
Expand All @@ -28,7 +29,8 @@ module.exports = (send) => {

const options = {
args: pathDst,
qs: opts
qs: opts,
converter: FileResultStreamConverter
}

const stream = sendFilesStream(options)
Expand Down
7 changes: 6 additions & 1 deletion src/util/fs-add.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const isNode = require('detect-node')
const promisify = require('promisify-es6')
const moduleConfig = require('../utils/module-config')
const SendOneFile = require('../utils/send-one-file-multiple-results')
const FileResultStreamConverter = require('../utils/file-result-stream-converter')

module.exports = (arg) => {
const sendOneFile = SendOneFile(moduleConfig(arg), 'add')
Expand Down Expand Up @@ -31,6 +32,10 @@ module.exports = (arg) => {
return callback(new Error('"path" must be a string'))
}

sendOneFile(path, { qs: opts }, callback)
const requestOpts = {
qs: opts,
converter: FileResultStreamConverter
}
sendOneFile(path, requestOpts, callback)
})
}
7 changes: 6 additions & 1 deletion src/util/url-add.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const parseUrl = require('url').parse
const request = require('../utils/request')
const moduleConfig = require('../utils/module-config')
const SendOneFile = require('../utils/send-one-file-multiple-results')
const FileResultStreamConverter = require('../utils/file-result-stream-converter')

module.exports = (arg) => {
const sendOneFile = SendOneFile(moduleConfig(arg), 'add')
Expand Down Expand Up @@ -49,7 +50,11 @@ const requestWithRedirect = (url, opts, sendOneFile, callback) => {
}
requestWithRedirect(redirection, opts, sendOneFile, callback)
} else {
sendOneFile(res, { qs: opts }, callback)
const requestOpts = {
qs: opts,
converter: FileResultStreamConverter
}
sendOneFile(res, requestOpts, callback)
}
}).end()
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
'use strict'

const pump = require('pump')
const TransformStream = require('readable-stream').Transform
const streamToValue = require('./stream-to-value')

/*
Transforms a stream of {Name, Hash} objects to include size
of the DAG object.

Usage: inputStream.pipe(new Converter())
Usage: inputStream.pipe(new FileResultStreamConverter())

Input object format:
{
Expand All @@ -24,7 +22,7 @@ const streamToValue = require('./stream-to-value')
size: 20
}
*/
class ConverterStream extends TransformStream {
class FileResultStreamConverter extends TransformStream {
constructor (options) {
const opts = Object.assign({}, options || {}, { objectMode: true })
super(opts)
Expand All @@ -43,19 +41,4 @@ class ConverterStream extends TransformStream {
}
}

function converter (inputStream, callback) {
const outputStream = new ConverterStream()
pump(
inputStream,
outputStream,
(err) => {
if (err) {
callback(err)
}
})

streamToValue(outputStream, callback)
}

exports = module.exports = converter
exports.ConverterStream = ConverterStream
module.exports = FileResultStreamConverter
31 changes: 21 additions & 10 deletions src/utils/send-files-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const isStream = require('is-stream')
const once = require('once')
const prepareFile = require('./prepare-file')
const Multipart = require('./multipart')
const Converter = require('./converter').ConverterStream

function headers (file) {
const name = file.path
Expand Down Expand Up @@ -127,15 +126,27 @@ module.exports = (send, path) => {

response.on('error', (err) => retStream.emit('error', err))

response.on('data', (d) => {
if (d.Bytes && options.progress) {
options.progress(d.Bytes)
}
})
const convertedResponse = new Converter()
convertedResponse.once('end', () => retStream.push(null))
convertedResponse.on('data', (d) => retStream.push(d))
response.pipe(convertedResponse)
if (options.converter) {
response.on('data', (d) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you're not returning the converted response, the back-pressure here doesn't work.
Instead, I suggest that you create a duplex stream from the input stream and the converted response, and that's what you would return. (This way, the client can do back-pressure: if the result data is being processed too slowly and the server implements back-pressure properly, the back-pressure should propagate all the way to the server and back into the request stream).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you could use duplexify to create a duplex stream from the writable (the client pushing files) and the readable (the result stream).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a bug in the original code? I don't see how my code is different from the original one in case a converter is set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmx I see, this is a problem that was already there before, I only spotted it now.
Feel free to ignore this (and we should probably open an issue with this one). :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the issue get created?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw it now.

if (d.Bytes && options.progress) {
options.progress(d.Bytes)
}
})

const Converter = options.converter
const convertedResponse = new Converter()
convertedResponse.once('end', () => retStream.push(null))
convertedResponse.on('data', (d) => retStream.push(d))
response.pipe(convertedResponse)
} else {
response.on('data', (d) => {
if (d.Bytes && options.progress) {
options.progress(d.Bytes)
}
retStream.push(d)
})
response.once('end', () => retStream.push(null))
}
})

// signal the multipart that the underlying stream has drained and that
Expand Down