Skip to content

Commit 27a8e2a

Browse files
authored
Updated abort behavior (#1141)
* Updated abort behavior - Support for aborting a request with the promise api - Aborting a request will cause a RequestAbortedError - Normalized Connection class errors, now every error returned is wrapped by the client errors constructors * Updated test * Updated docs * Updated code generation script * Renamed test * Code coverage * Avoid calling twice transport.request
1 parent 953a803 commit 27a8e2a

16 files changed

+823
-647
lines changed

docs/usage.asciidoc

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -67,54 +67,46 @@ client.search({
6767

6868
=== Aborting a request
6969

70-
When using the callback style API, the function also returns an object that
71-
allows you to abort the API request.
70+
If needed, you can abort a running request by calling the `request.abort()` method returned by the API.
71+
72+
CAUTION: If you abort a request, the request will fail with a `RequestAbortedError`.
7273

7374

7475
[source,js]
7576
----
76-
// calback API
7777
const request = client.search({
7878
index: 'my-index',
7979
body: { foo: 'bar' }
8080
}, {
8181
ignore: [404],
8282
maxRetries: 3
83-
}, (err, { body }) => {
84-
if (err) console.log(err)
83+
}, (err, result) => {
84+
if (err) {
85+
console.log(err) // RequestAbortedError
86+
} else {
87+
console.log(result)
88+
}
8589
})
8690
8791
request.abort()
8892
----
8993

90-
Aborting a request with the promise style API is not supported, but you can
91-
achieve that with convenience wrapper.
92-
94+
The same behavior is valid for the promise style API as well.
9395
[source,js]
9496
----
95-
function abortableRequest (params, options) {
96-
var request = null
97-
const promise = new Promise((resolve, reject) => {
98-
request = client.search(params, options, (err, result) => {
99-
err ? reject(err) : resolve(res)
100-
})
101-
})
102-
return {
103-
promise,
104-
abort: () => request.abort()
105-
}
106-
}
107-
108-
const request = abortableRequest({
97+
const request = client.search({
10998
index: 'my-index',
11099
body: { foo: 'bar' }
111100
}, {
112101
ignore: [404],
113102
maxRetries: 3
114103
})
115104
105+
request
106+
.then(result => console.log(result))
107+
.catch(err => console.log(err)) // RequestAbortedError
108+
116109
request.abort()
117-
// access the promise with `request.promise.[method]`
118110
----
119111

120112

@@ -213,6 +205,9 @@ You can find the errors exported by the client in the table below.
213205
|`ConnectionError`
214206
|Generated when an error occurs during the request, it can be a connection error or a malformed stream of data.
215207

208+
|`RequestAbortedError`
209+
|Generated if the user calls the `request.abort()` method.
210+
216211
|`NoLivingConnectionsError`
217212
|Given the configuration, the ConnectionPool was not able to find a usable Connection for this request.
218213

index.d.ts

Lines changed: 562 additions & 561 deletions
Large diffs are not rendered by default.

lib/Connection.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ const debug = require('debug')('elasticsearch')
1212
const decompressResponse = require('decompress-response')
1313
const pump = require('pump')
1414
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
15-
const { TimeoutError, ConfigurationError } = require('./errors')
15+
const {
16+
ConnectionError,
17+
RequestAbortedError,
18+
TimeoutError,
19+
ConfigurationError
20+
} = require('./errors')
1621

1722
class Connection {
1823
constructor (opts = {}) {
@@ -95,7 +100,7 @@ class Connection {
95100
if (ended === false) {
96101
ended = true
97102
this._openRequests--
98-
callback(err, null)
103+
callback(new ConnectionError(err.message), null)
99104
}
100105
})
101106

@@ -105,6 +110,7 @@ class Connection {
105110
if (ended === false) {
106111
ended = true
107112
this._openRequests--
113+
callback(new RequestAbortedError(), null)
108114
}
109115
})
110116

lib/Transport.d.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import * as errors from './errors';
1111
export type ApiError = errors.ConfigurationError | errors.ConnectionError |
1212
errors.DeserializationError | errors.SerializationError |
1313
errors.NoLivingConnectionsError | errors.ResponseError |
14-
errors.TimeoutError
14+
errors.TimeoutError | errors.RequestAbortedError
1515

1616
export interface nodeSelectorFn {
1717
(connections: Connection[]): Connection;
@@ -102,6 +102,10 @@ export interface TransportRequestCallback {
102102
abort: () => void;
103103
}
104104

105+
export interface TransportRequestPromise<T> extends Promise<T> {
106+
abort: () => void;
107+
}
108+
105109
export interface TransportGetConnectionOptions {
106110
requestId: string;
107111
}

lib/Transport.js

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const intoStream = require('into-stream')
1212
const ms = require('ms')
1313
const {
1414
ConnectionError,
15-
TimeoutError,
15+
RequestAbortedError,
1616
NoLivingConnectionsError,
1717
ResponseError,
1818
ConfigurationError
@@ -70,14 +70,19 @@ class Transport {
7070
callback = options
7171
options = {}
7272
}
73+
var p = null
7374

7475
// promises support
75-
if (callback == null) {
76-
return new Promise((resolve, reject) => {
77-
this.request(params, options, (err, result) => {
78-
err ? reject(err) : resolve(result)
79-
})
76+
if (callback === undefined) {
77+
let onFulfilled = null
78+
let onRejected = null
79+
p = new Promise((resolve, reject) => {
80+
onFulfilled = resolve
81+
onRejected = reject
8082
})
83+
callback = function callback (err, result) {
84+
err ? onRejected(err) : onFulfilled(result)
85+
}
8186
}
8287

8388
callback = once(callback)
@@ -107,7 +112,9 @@ class Transport {
107112
var request = { abort: noop }
108113

109114
const makeRequest = () => {
110-
if (meta.aborted === true) return
115+
if (meta.aborted === true) {
116+
return callback(new RequestAbortedError(), result)
117+
}
111118
meta.connection = this.getConnection({ requestId: meta.request.id })
112119
if (meta.connection == null) {
113120
return callback(new NoLivingConnectionsError(), result)
@@ -190,35 +197,30 @@ class Transport {
190197

191198
const onResponse = (err, response) => {
192199
if (err !== null) {
193-
// if there is an error in the connection
194-
// let's mark the connection as dead
195-
this.connectionPool.markDead(meta.connection)
196-
197-
if (this.sniffOnConnectionFault === true) {
198-
this.sniff({
199-
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
200-
requestId: meta.request.id
201-
})
202-
}
203-
204-
// retry logic
205-
if (meta.attempts < maxRetries) {
206-
meta.attempts++
207-
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
208-
request = makeRequest(params, callback)
209-
return
210-
}
200+
if (err.name !== 'RequestAbortedError') {
201+
// if there is an error in the connection
202+
// let's mark the connection as dead
203+
this.connectionPool.markDead(meta.connection)
211204

212-
const error = err instanceof TimeoutError
213-
? err
214-
: new ConnectionError(err.message, result)
205+
if (this.sniffOnConnectionFault === true) {
206+
this.sniff({
207+
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
208+
requestId: meta.request.id
209+
})
210+
}
215211

216-
if (err.name === 'TimeoutError') {
217-
err.meta = result
212+
// retry logic
213+
if (meta.attempts < maxRetries) {
214+
meta.attempts++
215+
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
216+
request = makeRequest(params, callback)
217+
return
218+
}
218219
}
219220

220-
this.emit('response', error, result)
221-
return callback(error, result)
221+
err.meta = result
222+
this.emit('response', err, result)
223+
return callback(err, result)
222224
}
223225

224226
const { statusCode, headers } = response
@@ -310,10 +312,17 @@ class Transport {
310312
request = makeRequest()
311313

312314
return {
313-
abort: () => {
315+
then (onFulfilled, onRejected) {
316+
return p.then(onFulfilled, onRejected)
317+
},
318+
catch (onRejected) {
319+
return p.catch(onRejected)
320+
},
321+
abort () {
314322
meta.aborted = true
315323
request.abort()
316324
debug('Aborting request', params)
325+
return this
317326
}
318327
}
319328
}

lib/errors.d.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,10 @@ export declare class ResponseError extends ElasticsearchClientError {
5959
headers: Record<string, any>;
6060
constructor(meta: ApiResponse);
6161
}
62+
63+
export declare class RequestAbortedError extends ElasticsearchClientError {
64+
name: string;
65+
message: string;
66+
meta: ApiResponse;
67+
constructor(message: string, meta: ApiResponse);
68+
}

lib/errors.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,16 @@ class ResponseError extends ElasticsearchClientError {
9595
}
9696
}
9797

98+
class RequestAbortedError extends ElasticsearchClientError {
99+
constructor (message, meta) {
100+
super(message)
101+
Error.captureStackTrace(this, RequestAbortedError)
102+
this.name = 'RequestAbortedError'
103+
this.message = message || 'Request aborted'
104+
this.meta = meta
105+
}
106+
}
107+
98108
module.exports = {
99109
ElasticsearchClientError,
100110
TimeoutError,
@@ -103,5 +113,6 @@ module.exports = {
103113
SerializationError,
104114
DeserializationError,
105115
ConfigurationError,
106-
ResponseError
116+
ResponseError,
117+
RequestAbortedError
107118
}

scripts/utils/generateMain.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,14 @@ function buildMethodDefinition (api, name, hasBody) {
183183

184184
if (hasBody) {
185185
let methods = [
186-
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
186+
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
187187
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
188188
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
189189
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
190190
]
191191
if (isSnakeCased(api)) {
192192
methods = methods.concat([
193-
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
193+
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
194194
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
195195
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
196196
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
@@ -199,14 +199,14 @@ function buildMethodDefinition (api, name, hasBody) {
199199
return methods
200200
} else {
201201
let methods = [
202-
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
202+
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
203203
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
204204
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
205205
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
206206
]
207207
if (isSnakeCased(api)) {
208208
methods = methods.concat([
209-
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
209+
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
210210
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
211211
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
212212
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }

test/behavior/sniff.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ test('Sniff on connection fault', t => {
210210
class MyConnection extends Connection {
211211
request (params, callback) {
212212
if (this.id === 'http://localhost:9200/') {
213-
callback(new Error('kaboom'), null)
213+
callback(new errors.ConnectionError('kaboom'), null)
214214
return {}
215215
} else {
216216
return super.request(params, callback)

0 commit comments

Comments
 (0)