diff --git a/lib/common/grpc-service.js b/lib/common/grpc-service.js index 48e48b7b4e0..1d663b04ff7 100644 --- a/lib/common/grpc-service.js +++ b/lib/common/grpc-service.js @@ -20,11 +20,13 @@ 'use strict'; +var extend = require('extend'); var googleProtoFiles = require('google-proto-files'); var grpc = require('grpc'); var is = require('is'); var nodeutil = require('util'); var path = require('path'); +var retryRequest = require('retry-request'); /** * @type {module:common/service} @@ -151,6 +153,8 @@ function GrpcService(config, options) { this.grpcCredentials = grpc.credentials.createInsecure(); } + this.maxRetries = config.maxRetries; + var apiVersion = config.apiVersion; var service = this.service = config.service; var rootDir = googleProtoFiles('..'); @@ -241,19 +245,47 @@ GrpcService.prototype.request = function(protoOpts, reqOpts, callback) { grpcOpts.deadline = new Date(Date.now() + protoOpts.timeout); } - service[protoOpts.method](reqOpts, function(err, resp) { - if (err) { - if (HTTP_ERROR_CODE_MAP[err.code]) { - var httpError = HTTP_ERROR_CODE_MAP[err.code]; - err.code = httpError.code; - } - - callback(err); - return; + // Retains a reference to an error from the response. If the final callback is + // executed with this as the "response", we return it to the user as an error. + var respError; + + retryRequest(null, { + shouldRetryFn: function(resp) { + return [429, 500, 502, 503].indexOf(resp.code) > -1; + }, + + maxRetries: this.maxRetries, + + // retry-request determines if it should retry from the incoming HTTP + // response status. gRPC always returns an error proto message. We pass that + // "error" into retry-request to act as the HTTP response, so it can use the + // status code to determine if it should retry. + request: function(_, onResponse) { + respError = null; + + service[protoOpts.method](reqOpts, function(err, resp) { + if (err) { + if (HTTP_ERROR_CODE_MAP[err.code]) { + respError = extend(err, HTTP_ERROR_CODE_MAP[err.code]); + onResponse(null, respError); + return; + } + + onResponse(err); + return; + } + + onResponse(null, resp); + }, null, grpcOpts); + } + }, function(err, resp) { + if (!err && resp === respError) { + err = respError; + resp = null; } - callback(null, resp); - }, null, grpcOpts); + callback(err, resp); + }); }; /** diff --git a/test/common/grpc-service.js b/test/common/grpc-service.js index bb7676692f3..00a95fd4937 100644 --- a/test/common/grpc-service.js +++ b/test/common/grpc-service.js @@ -23,6 +23,9 @@ var grpc = require('grpc'); var is = require('is'); var mockery = require('mockery-next'); var path = require('path'); +var retryRequest = require('retry-request'); + +var util = require('../../lib/common/util.js'); function FakeService() { this.calledWith_ = arguments; @@ -33,6 +36,11 @@ function fakeGoogleProtoFiles() { return (googleProtoFilesOverride || googleProtoFiles).apply(null, arguments); } +var retryRequestOverride; +function fakeRetryRequest() { + return (retryRequestOverride || retryRequest).apply(null, arguments); +} + var grpcLoadOverride; var fakeGrpc = { load: function() { @@ -74,7 +82,8 @@ describe('GrpcService', function() { var CONFIG = { proto: {}, service: 'Service', - apiVersion: 'v1' + apiVersion: 'v1', + maxRetries: 3 }; var OPTIONS = {}; @@ -90,6 +99,7 @@ describe('GrpcService', function() { before(function() { mockery.registerMock('google-proto-files', fakeGoogleProtoFiles); + mockery.registerMock('retry-request', fakeRetryRequest); mockery.registerMock('grpc', fakeGrpc); mockery.registerMock('../../lib/common/service.js', FakeService); @@ -108,6 +118,8 @@ describe('GrpcService', function() { }); beforeEach(function() { + retryRequestOverride = null; + googleProtoFilesOverride = function() { return ROOT_DIR; }; @@ -135,6 +147,10 @@ describe('GrpcService', function() { assert.strictEqual(calledWith[1], OPTIONS); }); + it('should localize maxRetries', function() { + assert.strictEqual(grpcService.maxRetries, CONFIG.maxRetries); + }); + it('should get the root directory for the proto files', function(done) { googleProtoFilesOverride = function(path) { assert.strictEqual(path, '..'); @@ -219,6 +235,93 @@ describe('GrpcService', function() { var REQ_OPTS = {}; var GRPC_CREDENTIALS = {}; + var HTTP_ERROR_CODE_MAP = { + 0: { + code: 200, + message: 'OK' + }, + + 1: { + code: 499, + message: 'Client Closed Request' + }, + + 2: { + code: 500, + message: 'Internal Server Error' + }, + + 3: { + code: 400, + message: 'Bad Request' + }, + + 4: { + code: 504, + message: 'Gateway Timeout' + }, + + 5: { + code: 404, + message: 'Not Found' + }, + + 6: { + code: 409, + message: 'Conflict' + }, + + 7: { + code: 403, + message: 'Forbidden' + }, + + 8: { + code: 429, + message: 'Too Many Requests' + }, + + 9: { + code: 412, + message: 'Precondition Failed' + }, + + 10: { + code: 409, + message: 'Conflict' + }, + + 11: { + code: 400, + message: 'Bad Request' + }, + + 12: { + code: 501, + message: 'Not Implemented' + }, + + 13: { + code: 500, + message: 'Internal Server Error' + }, + + 14: { + code: 503, + message: 'Service Unavailable' + }, + + 15: { + code: 500, + message: 'Internal Server Error' + }, + + 16: { + code: 401, + message: 'Unauthorized' + } + }; + function ProtoService() {} ProtoService.prototype.method = function() {}; @@ -353,13 +456,122 @@ describe('GrpcService', function() { grpcService.request(PROTO_OPTS, REQ_OPTS, assert.ifError); }); + describe('retry strategy', function() { + var retryRequestReqOpts; + var retryRequestOptions; + var retryRequestCallback; + + beforeEach(function() { + grpcService.protos.Service = { + service: util.noop + }; + + retryRequestOverride = function(reqOpts, options, callback) { + retryRequestReqOpts = reqOpts; + retryRequestOptions = options; + retryRequestCallback = callback; + }; + }); + + it('should use retry-request', function(done) { + var error = {}; + var response = {}; + + grpcService.request(PROTO_OPTS, REQ_OPTS, function(err, resp) { + assert.strictEqual(err, error); + assert.strictEqual(resp, response); + done(); + }); + + assert.strictEqual(retryRequestReqOpts, null); + assert.strictEqual( + retryRequestOptions.maxRetries, + grpcService.maxRetries + ); + + retryRequestCallback(error, response); + }); + + it('should retry on 429, 500, 502, and 503', function() { + grpcService.request(PROTO_OPTS, REQ_OPTS, assert.ifError); + + var shouldRetryFn = retryRequestOptions.shouldRetryFn; + + var retryErrors = [ + { code: 429 }, + { code: 500 }, + { code: 502 }, + { code: 503 } + ]; + + var nonRetryErrors = [ + { code: 200 }, + { code: 401 }, + { code: 404 }, + { code: 409 }, + { code: 412 } + ]; + + assert.strictEqual(retryErrors.every(shouldRetryFn), true); + assert.strictEqual(nonRetryErrors.every(shouldRetryFn), false); + }); + + it('should treat a retriable error as an HTTP response', function(done) { + var grpcError500 = { code: 2 }; + + grpcService.protos.Service = { + service: function() { + return { + method: function(reqOpts, callback) { + callback(grpcError500); + } + }; + } + }; + + grpcService.request(PROTO_OPTS, REQ_OPTS, assert.ifError); + + function onResponse(err, resp) { + assert.strictEqual(err, null); + assert.deepEqual(resp, HTTP_ERROR_CODE_MAP[2]); + + done(); + } + + retryRequestOptions.request({}, onResponse); + }); + + it('should exec callback with response error as error', function(done) { + var grpcError500 = { code: 2 }; + + grpcService.protos.Service = { + service: function() { + return { + method: function(reqOpts, callback) { + callback(grpcError500); + } + }; + } + }; + + grpcService.request(PROTO_OPTS, REQ_OPTS, function(err, resp) { + assert.deepEqual(err, HTTP_ERROR_CODE_MAP[2]); + assert.strictEqual(resp, null); + done(); + }); + + // When the gRPC error is passed to "onResponse", it will just invoke + // the callback passed to retry-request. We will check if the grpc Error + retryRequestOptions.request({}, retryRequestCallback); + }); + }); + it('should make the correct request on the proto service', function(done) { grpcService.protos.Service = { service: function() { return { method: function(reqOpts) { - assert.strictEqual(reqOpts.camelOption, undefined); - assert.strictEqual(reqOpts.camel_option, REQ_OPTS.camelOption); + assert.strictEqual(reqOpts, REQ_OPTS); done(); } }; @@ -413,93 +625,6 @@ describe('GrpcService', function() { }); describe('error', function() { - var HTTP_ERROR_CODE_MAP = { - 0: { - code: 200, - message: 'OK' - }, - - 1: { - code: 499, - message: 'Client Closed Request' - }, - - 2: { - code: 500, - message: 'Internal Server Error' - }, - - 3: { - code: 400, - message: 'Bad Request' - }, - - 4: { - code: 504, - message: 'Gateway Timeout' - }, - - 5: { - code: 404, - message: 'Not Found' - }, - - 6: { - code: 409, - message: 'Conflict' - }, - - 7: { - code: 403, - message: 'Forbidden' - }, - - 8: { - code: 429, - message: 'Too Many Requests' - }, - - 9: { - code: 412, - message: 'Precondition Failed' - }, - - 10: { - code: 409, - message: 'Conflict' - }, - - 11: { - code: 400, - message: 'Bad Request' - }, - - 12: { - code: 501, - message: 'Not Implemented' - }, - - 13: { - code: 500, - message: 'Internal Server Error' - }, - - 14: { - code: 503, - message: 'Service Unavailable' - }, - - 15: { - code: 500, - message: 'Internal Server Error' - }, - - 16: { - code: 401, - message: 'Unauthorized' - } - }; - it('should look up the http status from the code', function() { /*jshint loopfunc:true */ for (var grpcErrorCode in HTTP_ERROR_CODE_MAP) { @@ -526,9 +651,7 @@ describe('GrpcService', function() { }); describe('success', function() { - var RESPONSE = { - snake_property: true - }; + var RESPONSE = {}; beforeEach(function() { grpcService.protos.Service = {