Skip to content

Updated abort behavior #1141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 18 additions & 23 deletions docs/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,54 +67,46 @@ client.search({

=== Aborting a request

When using the callback style API, the function also returns an object that
allows you to abort the API request.
If needed, you can abort a running request by calling the `request.abort()` method returned by the API.

CAUTION: If you abort a request, the request will fail with a `RequestAbortedError`.


[source,js]
----
// calback API
const request = client.search({
index: 'my-index',
body: { foo: 'bar' }
}, {
ignore: [404],
maxRetries: 3
}, (err, { body }) => {
if (err) console.log(err)
}, (err, result) => {
if (err) {
console.log(err) // RequestAbortedError
} else {
console.log(result)
}
})

request.abort()
----

Aborting a request with the promise style API is not supported, but you can
achieve that with convenience wrapper.

The same behavior is valid for the promise style API as well.
[source,js]
----
function abortableRequest (params, options) {
var request = null
const promise = new Promise((resolve, reject) => {
request = client.search(params, options, (err, result) => {
err ? reject(err) : resolve(res)
})
})
return {
promise,
abort: () => request.abort()
}
}

const request = abortableRequest({
const request = client.search({
index: 'my-index',
body: { foo: 'bar' }
}, {
ignore: [404],
maxRetries: 3
})

request
.then(result => console.log(result))
.catch(err => console.log(err)) // RequestAbortedError

request.abort()
// access the promise with `request.promise.[method]`
----


Expand Down Expand Up @@ -213,6 +205,9 @@ You can find the errors exported by the client in the table below.
|`ConnectionError`
|Generated when an error occurs during the request, it can be a connection error or a malformed stream of data.

|`RequestAbortedError`
|Generated if the user calls the `request.abort()` method.

|`NoLivingConnectionsError`
|Given the configuration, the ConnectionPool was not able to find a usable Connection for this request.

Expand Down
1,123 changes: 562 additions & 561 deletions index.d.ts

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions lib/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ const debug = require('debug')('elasticsearch')
const decompressResponse = require('decompress-response')
const pump = require('pump')
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
const { TimeoutError, ConfigurationError } = require('./errors')
const {
ConnectionError,
RequestAbortedError,
TimeoutError,
ConfigurationError
} = require('./errors')

class Connection {
constructor (opts = {}) {
Expand Down Expand Up @@ -95,7 +100,7 @@ class Connection {
if (ended === false) {
ended = true
this._openRequests--
callback(err, null)
callback(new ConnectionError(err.message), null)
}
})

Expand All @@ -105,6 +110,7 @@ class Connection {
if (ended === false) {
ended = true
this._openRequests--
callback(new RequestAbortedError(), null)
}
})

Expand Down
6 changes: 5 additions & 1 deletion lib/Transport.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as errors from './errors';
export type ApiError = errors.ConfigurationError | errors.ConnectionError |
errors.DeserializationError | errors.SerializationError |
errors.NoLivingConnectionsError | errors.ResponseError |
errors.TimeoutError
errors.TimeoutError | errors.RequestAbortedError

export interface nodeSelectorFn {
(connections: Connection[]): Connection;
Expand Down Expand Up @@ -102,6 +102,10 @@ export interface TransportRequestCallback {
abort: () => void;
}

export interface TransportRequestPromise<T> extends Promise<T> {
abort: () => void;
}

export interface TransportGetConnectionOptions {
requestId: string;
}
Expand Down
75 changes: 42 additions & 33 deletions lib/Transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const intoStream = require('into-stream')
const ms = require('ms')
const {
ConnectionError,
TimeoutError,
RequestAbortedError,
NoLivingConnectionsError,
ResponseError,
ConfigurationError
Expand Down Expand Up @@ -70,14 +70,19 @@ class Transport {
callback = options
options = {}
}
var p = null

// promises support
if (callback == null) {
return new Promise((resolve, reject) => {
this.request(params, options, (err, result) => {
err ? reject(err) : resolve(result)
})
if (callback === undefined) {
let onFulfilled = null
let onRejected = null
p = new Promise((resolve, reject) => {
onFulfilled = resolve
onRejected = reject
})
callback = function callback (err, result) {
err ? onRejected(err) : onFulfilled(result)
}
}

callback = once(callback)
Expand Down Expand Up @@ -107,7 +112,9 @@ class Transport {
var request = { abort: noop }

const makeRequest = () => {
if (meta.aborted === true) return
if (meta.aborted === true) {
return callback(new RequestAbortedError(), result)
}
meta.connection = this.getConnection({ requestId: meta.request.id })
if (meta.connection == null) {
return callback(new NoLivingConnectionsError(), result)
Expand Down Expand Up @@ -190,35 +197,30 @@ class Transport {

const onResponse = (err, response) => {
if (err !== null) {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(meta.connection)

if (this.sniffOnConnectionFault === true) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
})
}

// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback)
return
}
if (err.name !== 'RequestAbortedError') {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(meta.connection)

const error = err instanceof TimeoutError
? err
: new ConnectionError(err.message, result)
if (this.sniffOnConnectionFault === true) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
})
}

if (err.name === 'TimeoutError') {
err.meta = result
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback)
return
}
}

this.emit('response', error, result)
return callback(error, result)
err.meta = result
this.emit('response', err, result)
return callback(err, result)
}

const { statusCode, headers } = response
Expand Down Expand Up @@ -310,10 +312,17 @@ class Transport {
request = makeRequest()

return {
abort: () => {
then (onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected)
},
catch (onRejected) {
return p.catch(onRejected)
},
abort () {
meta.aborted = true
request.abort()
debug('Aborting request', params)
return this
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions lib/errors.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ export declare class ResponseError extends ElasticsearchClientError {
headers: Record<string, any>;
constructor(meta: ApiResponse);
}

export declare class RequestAbortedError extends ElasticsearchClientError {
name: string;
message: string;
meta: ApiResponse;
constructor(message: string, meta: ApiResponse);
}
13 changes: 12 additions & 1 deletion lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ class ResponseError extends ElasticsearchClientError {
}
}

class RequestAbortedError extends ElasticsearchClientError {
constructor (message, meta) {
super(message)
Error.captureStackTrace(this, RequestAbortedError)
this.name = 'RequestAbortedError'
this.message = message || 'Request aborted'
this.meta = meta
}
}

module.exports = {
ElasticsearchClientError,
TimeoutError,
Expand All @@ -103,5 +113,6 @@ module.exports = {
SerializationError,
DeserializationError,
ConfigurationError,
ResponseError
ResponseError,
RequestAbortedError
}
8 changes: 4 additions & 4 deletions scripts/utils/generateMain.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ function buildMethodDefinition (api, name, hasBody) {

if (hasBody) {
let methods = [
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
]
if (isSnakeCased(api)) {
methods = methods.concat([
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
Expand All @@ -199,14 +199,14 @@ function buildMethodDefinition (api, name, hasBody) {
return methods
} else {
let methods = [
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
]
if (isSnakeCased(api)) {
methods = methods.concat([
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
Expand Down
2 changes: 1 addition & 1 deletion test/behavior/sniff.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ test('Sniff on connection fault', t => {
class MyConnection extends Connection {
request (params, callback) {
if (this.id === 'http://localhost:9200/') {
callback(new Error('kaboom'), null)
callback(new errors.ConnectionError('kaboom'), null)
return {}
} else {
return super.request(params, callback)
Expand Down
Loading