Skip to content

Commit d439aac

Browse files
core: add retry logic to read streams
1 parent 07ceb99 commit d439aac

File tree

6 files changed

+436
-397
lines changed

6 files changed

+436
-397
lines changed

lib/common/util.js

Lines changed: 100 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ var request = require('request').defaults({
2929
maxSockets: Infinity
3030
}
3131
});
32+
var retryRequest = require('retry-request');
33+
var streamForward = require('stream-forward');
34+
var through = require('through2');
3235
var uuid = require('node-uuid');
3336

3437
/** @const {object} gcloud-node's package.json file. */
@@ -178,34 +181,68 @@ nodeutil.inherits(ApiError, Error);
178181
*/
179182
function handleResp(err, resp, body, callback) {
180183
callback = callback || noop;
184+
181185
if (err) {
182186
callback(err);
183187
return;
184188
}
185-
if (typeof body === 'string') {
186-
try {
187-
body = JSON.parse(body);
188-
} catch(err) {}
189-
}
190-
if (body && body.error) {
191-
// Error from JSON api.
192-
callback(new ApiError(body.error));
189+
190+
var parsedApiResponse = util.parseApiResp(resp, body);
191+
192+
if (parsedApiResponse.err) {
193+
callback(parsedApiResponse.err);
193194
return;
194195
}
195-
if (resp && (resp.statusCode < 200 || resp.statusCode > 299)) {
196+
197+
callback(null, parsedApiResponse.body, parsedApiResponse.resp);
198+
}
199+
200+
util.handleResp = handleResp;
201+
202+
/**
203+
* From an HTTP response, generate an error if one occurred.
204+
*
205+
* @param {*} resp - Response value.
206+
* @param {*=} body - Body value.
207+
* @return {object} parsedResponse - The parsed response.
208+
* @param {?error} parsedResponse.err - An error detected.
209+
* @param {object} parsedResponse.resp - The original response object.
210+
* @param {*} parsedREsponse.body - The original body value provided will try to
211+
* be JSON.parse'd. If it's successful, the parsed value will be returned
212+
* here, otherwise the original value.
213+
*/
214+
function parseApiResp(resp, body) {
215+
var parsedResponse = {
216+
err: null,
217+
resp: resp,
218+
body: body
219+
};
220+
221+
if (resp.statusCode < 200 || resp.statusCode > 299) {
196222
// Unknown error. Format according to ApiError standard.
197-
callback(new ApiError({
223+
parsedResponse.err = new ApiError({
198224
errors: [],
199225
code: resp.statusCode,
200-
message: body || 'Error during request.',
226+
message: 'Error during request.',
201227
response: resp
202-
}));
203-
return;
228+
});
229+
}
230+
231+
if (util.is(body, 'string')) {
232+
try {
233+
parsedResponse.body = JSON.parse(body);
234+
} catch(err) {}
235+
}
236+
237+
if (parsedResponse.body && parsedResponse.body.error) {
238+
// Error from JSON API.
239+
parsedResponse.err = new ApiError(parsedResponse.body.error);
204240
}
205-
callback(null, body, resp);
241+
242+
return parsedResponse;
206243
}
207244

208-
util.handleResp = handleResp;
245+
util.parseApiResp = parseApiResp;
209246

210247
/**
211248
* Get the type of a value.
@@ -418,20 +455,6 @@ function makeWritableStream(dup, options, onComplete) {
418455

419456
util.makeWritableStream = makeWritableStream;
420457

421-
/**
422-
* Returns an exponential distributed time to wait given the number of retries
423-
* that have been previously been attempted on the request.
424-
*
425-
* @param {number} retryNumber - The number of retries previously attempted.
426-
* @return {number} An exponentially distributed time to wait E.g. for use with
427-
* exponential backoff.
428-
*/
429-
function getNextRetryWait(retryNumber) {
430-
return (Math.pow(2, retryNumber) * 1000) + Math.floor(Math.random() * 1000);
431-
}
432-
433-
util.getNextRetryWait = getNextRetryWait;
434-
435458
/**
436459
* Returns true if the API request should be retried, given the error that was
437460
* given the first time the request was attempted. This is used for rate limit
@@ -599,31 +622,45 @@ function makeAuthorizedRequestFactory(config) {
599622
* be made. Instead, this function is passed the error & authorized
600623
* request options.
601624
*/
602-
function makeAuthorizedRequest(reqOpts, callback) {
603-
if (config.customEndpoint) {
604-
// Using a custom API override. Do not use `google-auth-library` for
605-
// authentication. (ex: connecting to a local Datastore server)
606-
if (callback.onAuthorized) {
607-
callback.onAuthorized(null, reqOpts);
608-
} else {
609-
util.makeRequest(reqOpts, config, callback);
610-
}
625+
function makeAuthorizedRequest(reqOpts, options) {
626+
var stream;
627+
var reqConfig = extend({}, config);
611628

612-
return;
629+
if (!options) {
630+
stream = through();
631+
reqConfig.stream = stream;
613632
}
614633

615-
util.authorizeRequest(config, reqOpts, function(err, authorizedReqOpts) {
634+
function onAuthorized(err, authorizedReqOpts) {
616635
if (err) {
617-
(callback.onAuthorized || callback)(err);
636+
if (stream) {
637+
stream.emit('error', err);
638+
stream.end();
639+
} else {
640+
(options || options.onAuthorized)(err);
641+
}
642+
618643
return;
619644
}
620645

621-
if (callback.onAuthorized) {
622-
callback.onAuthorized(null, authorizedReqOpts);
646+
if (options && options.onAuthorized) {
647+
options.onAuthorized(null, authorizedReqOpts);
623648
} else {
624-
util.makeRequest(authorizedReqOpts, config, callback);
649+
util.makeRequest(authorizedReqOpts, reqConfig, options);
625650
}
626-
});
651+
}
652+
653+
if (reqConfig.customEndpoint) {
654+
// Using a custom API override. Do not use `google-auth-library` for
655+
// authentication. (ex: connecting to a local Datastore server)
656+
onAuthorized(null, reqOpts);
657+
} else {
658+
util.authorizeRequest(reqConfig, reqOpts, onAuthorized);
659+
}
660+
661+
if (stream) {
662+
return stream;
663+
}
627664
}
628665

629666
makeAuthorizedRequest.getCredentials = function(callback) {
@@ -653,8 +690,8 @@ function makeAuthorizedRequestFactory(config) {
653690
util.makeAuthorizedRequestFactory = makeAuthorizedRequestFactory;
654691

655692
/**
656-
* Make a request through the `request` module with built-in error handling and
657-
* exponential back off.
693+
* Make a request through the `retryRequest` module with built-in error handling
694+
* and exponential back off.
658695
*
659696
* @param {object} reqOpts - Request options in the format `request` expects.
660697
* @param {object=} config - Configuration object.
@@ -676,30 +713,26 @@ function makeRequest(reqOpts, config, callback) {
676713

677714
reqOpts = util.decorateRequest(reqOpts);
678715

679-
var MAX_RETRIES = config.maxRetries || 3;
680-
var autoRetry = config.autoRetry !== false ? true : false;
681-
var attemptedRetries = 0;
716+
var options = {
717+
request: request,
682718

683-
function shouldRetry(err) {
684-
return autoRetry &&
685-
MAX_RETRIES > attemptedRetries &&
686-
util.shouldRetryRequest(err);
687-
}
719+
retries: config.autoRetry !== false ? config.maxRetries || 3 : 0,
688720

689-
function makeRateLimitedRequest() {
690-
request(reqOpts, function(err, resp, body) {
691-
util.handleResp(err, resp, body, function(err, body, resp) {
692-
if (shouldRetry(err)) {
693-
var delay = util.getNextRetryWait(attemptedRetries++);
694-
setTimeout(makeRateLimitedRequest, delay);
695-
} else {
696-
callback(err || null, body, resp);
697-
}
698-
});
721+
shouldRetryFn: function(resp) {
722+
var err = util.parseApiResp(resp).err;
723+
return err && util.shouldRetryRequest(err);
724+
}
725+
};
726+
727+
if (config.stream) {
728+
// `streamForward` is used to re-emit the events the request stream receives
729+
// on to the stream the user is holding (config.stream).
730+
streamForward(retryRequest(reqOpts, options)).pipe(config.stream);
731+
} else {
732+
retryRequest(reqOpts, options, function(err, response, body) {
733+
util.handleResp(err, response, body, callback);
699734
});
700735
}
701-
702-
makeRateLimitedRequest();
703736
}
704737

705738
util.makeRequest = makeRequest;

0 commit comments

Comments
 (0)