Skip to content

Commit e67b55d

Browse files
authored
Do not retry a request if the body is a stream (#1143)
* Do not retry a request if the body is a stream Refactored the trnasport.request method to not use stream for gzipping the body, but use the callback API instead. The maxRetries will be 0 in case of a stream body and cached the Accept-Encoding header. * Updated dependencies * Updated test
1 parent d10e8bb commit e67b55d

File tree

4 files changed

+209
-99
lines changed

4 files changed

+209
-99
lines changed

lib/Transport.js

Lines changed: 98 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66

77
const debug = require('debug')('elasticsearch')
88
const os = require('os')
9-
const once = require('once')
10-
const { createGzip } = require('zlib')
11-
const intoStream = require('into-stream')
9+
const { gzip, createGzip } = require('zlib')
1210
const ms = require('ms')
1311
const {
1412
ConnectionError,
@@ -35,7 +33,11 @@ class Transport {
3533
this.requestTimeout = toMs(opts.requestTimeout)
3634
this.suggestCompression = opts.suggestCompression === true
3735
this.compression = opts.compression || false
38-
this.headers = Object.assign({}, { 'User-Agent': userAgent }, opts.headers)
36+
this.headers = Object.assign({},
37+
{ 'User-Agent': userAgent },
38+
opts.suggestCompression === true ? { 'Accept-Encoding': 'gzip,deflate' } : null,
39+
opts.headers
40+
)
3941
this.sniffInterval = opts.sniffInterval
4042
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
4143
this.sniffEndpoint = opts.sniffEndpoint
@@ -85,7 +87,6 @@ class Transport {
8587
}
8688
}
8789

88-
callback = once(callback)
8990
const meta = {
9091
context: options.context || null,
9192
request: {
@@ -107,8 +108,12 @@ class Transport {
107108
meta
108109
}
109110

110-
const maxRetries = options.maxRetries || this.maxRetries
111-
const compression = options.compression || this.compression
111+
// We should not retry if we are sending a stream body, because we should store in memory
112+
// a copy of the stream to be able to send it again, but since we don't know in advance
113+
// the size of the stream, we risk to take too much memory.
114+
// Furthermore, copying everytime the stream is very a expensive operation.
115+
const maxRetries = isStream(params.body) ? 0 : options.maxRetries || this.maxRetries
116+
const compression = options.compression !== undefined ? options.compression : this.compression
112117
var request = { abort: noop }
113118

114119
const makeRequest = () => {
@@ -119,80 +124,9 @@ class Transport {
119124
if (meta.connection == null) {
120125
return callback(new NoLivingConnectionsError(), result)
121126
}
122-
// TODO: make this assignment FAST
123-
const headers = Object.assign({}, this.headers, options.headers)
124-
125-
if (options.opaqueId !== undefined) {
126-
headers['X-Opaque-Id'] = this.opaqueIdPrefix !== null
127-
? this.opaqueIdPrefix + options.opaqueId
128-
: options.opaqueId
129-
}
130-
131-
// handle json body
132-
if (params.body != null) {
133-
if (shouldSerialize(params.body) === true) {
134-
try {
135-
params.body = this.serializer.serialize(params.body)
136-
} catch (err) {
137-
return callback(err, result)
138-
}
139-
}
140-
141-
if (params.body !== '') {
142-
headers['Content-Type'] = headers['Content-Type'] || 'application/json'
143-
if (compression === 'gzip') {
144-
if (isStream(params.body) === false) {
145-
params.body = intoStream(params.body).pipe(createGzip())
146-
} else {
147-
params.body = params.body.pipe(createGzip())
148-
}
149-
headers['Content-Encoding'] = compression
150-
}
151-
}
152-
153-
if (isStream(params.body) === false) {
154-
headers['Content-Length'] = '' + Buffer.byteLength(params.body)
155-
}
156-
// handle ndjson body
157-
} else if (params.bulkBody != null) {
158-
if (shouldSerialize(params.bulkBody) === true) {
159-
try {
160-
params.body = this.serializer.ndserialize(params.bulkBody)
161-
} catch (err) {
162-
return callback(err, result)
163-
}
164-
} else {
165-
params.body = params.bulkBody
166-
}
167-
headers['Content-Type'] = headers['Content-Type'] || 'application/x-ndjson'
168-
if (isStream(params.body) === false) {
169-
headers['Content-Length'] = '' + Buffer.byteLength(params.body)
170-
}
171-
}
172-
173-
if (this.suggestCompression === true) {
174-
headers['Accept-Encoding'] = 'gzip,deflate'
175-
}
176-
177-
params.headers = headers
178-
// serializes the querystring
179-
if (options.querystring == null) {
180-
params.querystring = this.serializer.qserialize(params.querystring)
181-
} else {
182-
params.querystring = this.serializer.qserialize(
183-
Object.assign({}, params.querystring, options.querystring)
184-
)
185-
}
186-
187-
meta.request.params = params
188-
meta.request.options = options
189127
this.emit('request', null, result)
190-
191-
// handles request timeout
192-
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
193-
if (options.asStream === true) params.asStream = true
194128
// perform the actual http request
195-
return meta.connection.request(params, onResponse)
129+
request = meta.connection.request(params, onResponse)
196130
}
197131

198132
const onResponse = (err, response) => {
@@ -213,7 +147,7 @@ class Transport {
213147
if (meta.attempts < maxRetries) {
214148
meta.attempts++
215149
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
216-
request = makeRequest(params, callback)
150+
makeRequest()
217151
return
218152
}
219153
}
@@ -226,7 +160,7 @@ class Transport {
226160
const { statusCode, headers } = response
227161
result.statusCode = statusCode
228162
result.headers = headers
229-
if (headers['warning'] != null) {
163+
if (headers['warning'] !== undefined) {
230164
result.warnings = result.warnings || []
231165
// split the string over the commas not inside quotes
232166
result.warnings.push.apply(result.warnings, headers['warning'].split(/(?!\B"[^"]*),(?![^"]*"\B)/))
@@ -255,7 +189,7 @@ class Transport {
255189
// - a `content-type` is defined and is equal to `application/json`
256190
// - the request is not a HEAD request
257191
// - the payload is not an empty string
258-
if (headers['content-type'] != null &&
192+
if (headers['content-type'] !== undefined &&
259193
headers['content-type'].indexOf('application/json') > -1 &&
260194
isHead === false &&
261195
payload !== ''
@@ -285,7 +219,7 @@ class Transport {
285219
if (meta.attempts < maxRetries && statusCode !== 429) {
286220
meta.attempts++
287221
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
288-
request = makeRequest(params, callback)
222+
makeRequest()
289223
return
290224
}
291225
} else {
@@ -309,7 +243,86 @@ class Transport {
309243
})
310244
}
311245

312-
request = makeRequest()
246+
const headers = Object.assign({}, this.headers, options.headers)
247+
248+
if (options.opaqueId !== undefined) {
249+
headers['X-Opaque-Id'] = this.opaqueIdPrefix !== null
250+
? this.opaqueIdPrefix + options.opaqueId
251+
: options.opaqueId
252+
}
253+
254+
// handle json body
255+
if (params.body != null) {
256+
if (shouldSerialize(params.body) === true) {
257+
try {
258+
params.body = this.serializer.serialize(params.body)
259+
} catch (err) {
260+
return callback(err, result)
261+
}
262+
}
263+
264+
if (params.body !== '') {
265+
headers['Content-Type'] = headers['Content-Type'] || 'application/json'
266+
}
267+
268+
// handle ndjson body
269+
} else if (params.bulkBody != null) {
270+
if (shouldSerialize(params.bulkBody) === true) {
271+
try {
272+
params.body = this.serializer.ndserialize(params.bulkBody)
273+
} catch (err) {
274+
return callback(err, result)
275+
}
276+
} else {
277+
params.body = params.bulkBody
278+
}
279+
if (params.body !== '') {
280+
headers['Content-Type'] = headers['Content-Type'] || 'application/x-ndjson'
281+
}
282+
}
283+
284+
params.headers = headers
285+
// serializes the querystring
286+
if (options.querystring == null) {
287+
params.querystring = this.serializer.qserialize(params.querystring)
288+
} else {
289+
params.querystring = this.serializer.qserialize(
290+
Object.assign({}, params.querystring, options.querystring)
291+
)
292+
}
293+
294+
// handles request timeout
295+
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
296+
if (options.asStream === true) params.asStream = true
297+
meta.request.params = params
298+
meta.request.options = options
299+
300+
// handle compression
301+
if (params.body !== '' && params.body != null) {
302+
if (isStream(params.body) === true) {
303+
if (compression === 'gzip') {
304+
params.headers['Content-Encoding'] = compression
305+
params.body = params.body.pipe(createGzip())
306+
}
307+
makeRequest()
308+
} else if (compression === 'gzip') {
309+
gzip(params.body, (err, buffer) => {
310+
/* istanbul ignore next */
311+
if (err) {
312+
return callback(err, result)
313+
}
314+
params.headers['Content-Encoding'] = compression
315+
params.headers['Content-Length'] = '' + Buffer.byteLength(buffer)
316+
params.body = buffer
317+
makeRequest()
318+
})
319+
} else {
320+
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
321+
makeRequest()
322+
}
323+
} else {
324+
makeRequest()
325+
}
313326

314327
return {
315328
then (onFulfilled, onRejected) {
@@ -405,7 +418,7 @@ function shouldSerialize (obj) {
405418
}
406419

407420
function isStream (obj) {
408-
return typeof obj.pipe === 'function'
421+
return obj != null && typeof obj.pipe === 'function'
409422
}
410423

411424
function defaultNodeFilter (node) {

package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"deepmerge": "^4.0.0",
4747
"dezalgo": "^1.0.3",
4848
"fast-deep-equal": "^3.1.1",
49+
"into-stream": "^5.1.1",
4950
"js-yaml": "^3.13.1",
5051
"license-checker": "^25.0.1",
5152
"lolex": "^4.0.1",
@@ -66,9 +67,7 @@
6667
"dependencies": {
6768
"debug": "^4.1.1",
6869
"decompress-response": "^4.2.0",
69-
"into-stream": "^5.1.0",
7070
"ms": "^2.1.1",
71-
"once": "^1.4.0",
7271
"pump": "^3.0.0",
7372
"secure-json-parse": "^2.1.0"
7473
},

test/unit/events.test.js

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ test('Should emit a request event when a request is performed', t => {
3232
method: 'GET',
3333
path: '/test/_search',
3434
body: '',
35-
querystring: 'q=foo%3Abar',
36-
headers: {
37-
'Content-Length': '0'
38-
}
35+
querystring: 'q=foo%3Abar'
3936
},
4037
options: {},
4138
id: 1
@@ -83,10 +80,7 @@ test('Should emit a response event in case of a successful response', t => {
8380
method: 'GET',
8481
path: '/test/_search',
8582
body: '',
86-
querystring: 'q=foo%3Abar',
87-
headers: {
88-
'Content-Length': '0'
89-
}
83+
querystring: 'q=foo%3Abar'
9084
},
9185
options: {},
9286
id: 1
@@ -132,10 +126,7 @@ test('Should emit a response event with the error set', t => {
132126
method: 'GET',
133127
path: '/test/_search',
134128
body: '',
135-
querystring: 'q=foo%3Abar',
136-
headers: {
137-
'Content-Length': '0'
138-
}
129+
querystring: 'q=foo%3Abar'
139130
},
140131
options: {
141132
requestTimeout: 500

0 commit comments

Comments
 (0)