From 50e1c6ce0f7ac7ab2349074509488c1a84712106 Mon Sep 17 00:00:00 2001 From: ifsnow Date: Fri, 21 Jul 2017 10:54:14 +0900 Subject: [PATCH] Useful options and performance improvements for Pool --- Readme.md | 65 ++ lib/ConnectionConfig.js | 52 +- lib/Pool.js | 262 ++------ lib/PoolConfig.js | 51 +- lib/PoolConnection.js | 87 ++- lib/PoolConnectionManager.js | 596 ++++++++++++++++++ lib/PoolConnectionManagerData.js | 185 ++++++ lib/protocol/sequences/ChangeUser.js | 10 +- test/common.js | 1 - .../connection/test-change-user-charset.js | 2 +- ...t-connection-config-flags-affected-rows.js | 2 + test/integration/connection/test-format.js | 14 + test/integration/connection/test-types.js | 11 + test/unit/connection/test-change-user.js | 14 +- .../pool/test-acquire-timeout-existing.js | 8 +- .../pool/test-check-connection-disabled.js | 38 ++ .../test-check-connection-interval-high.js | 39 ++ .../test-check-connection-interval-low.js | 39 ++ test/unit/pool/test-connection-bad.js | 5 +- test/unit/pool/test-connection-domain.js | 3 +- test/unit/pool/test-connection-reset.js | 5 +- test/unit/pool/test-destroy-connection.js | 6 +- test/unit/pool/test-end-error.js | 20 + test/unit/pool/test-evict.js | 32 + test/unit/pool/test-extra-methods.js | 31 + test/unit/pool/test-initial-size-wait.js | 38 ++ test/unit/pool/test-initial-size.js | 33 + test/unit/pool/test-max-idle-nolimit.js | 50 ++ test/unit/pool/test-max-idle.js | 52 ++ test/unit/pool/test-max-reuse-count.js | 30 + test/unit/pool/test-min-idle.js | 34 + .../pool/test-pool-connection-manager-data.js | 108 ++++ test/unit/pool/test-queue-timeout.js | 32 + test/unit/pool/test-release-event.js | 8 - 34 files changed, 1654 insertions(+), 309 deletions(-) create mode 100644 lib/PoolConnectionManager.js create mode 100644 lib/PoolConnectionManagerData.js create mode 100644 test/integration/connection/test-format.js create mode 100644 test/integration/connection/test-types.js create mode 100644 test/unit/pool/test-check-connection-disabled.js create mode 100644 test/unit/pool/test-check-connection-interval-high.js create mode 100644 test/unit/pool/test-check-connection-interval-low.js create mode 100644 test/unit/pool/test-end-error.js create mode 100644 test/unit/pool/test-evict.js create mode 100644 test/unit/pool/test-extra-methods.js create mode 100644 test/unit/pool/test-initial-size-wait.js create mode 100644 test/unit/pool/test-initial-size.js create mode 100644 test/unit/pool/test-max-idle-nolimit.js create mode 100644 test/unit/pool/test-max-idle.js create mode 100644 test/unit/pool/test-max-reuse-count.js create mode 100644 test/unit/pool/test-min-idle.js create mode 100644 test/unit/pool/test-pool-connection-manager-data.js create mode 100644 test/unit/pool/test-queue-timeout.js diff --git a/Readme.md b/Readme.md index 9fab849da..46b1dbb53 100644 --- a/Readme.md +++ b/Readme.md @@ -392,6 +392,35 @@ constructor. In addition to those options pools accept a few extras: * `queueLimit`: The maximum number of connection requests the pool will queue before returning an error from `getConnection`. If set to `0`, there is no limit to the number of queued connection requests. (Default: `0`) +* `queueTimeout`: The maximum number of milliseconds that the queued request will + wait for a connection when there are no available connections. If set to `0`, + wait indefinitely. (Default: `0`) +* `testOnBorrow`: Indicates whether the connection is validated before borrowed + from the pool. If the connection fails to validate, it is dropped from the pool. + (Default: `true`) +* `testOnBorrowInterval`: The number of milliseconds that indicates how often + to validate if the connection is working since it was last used. If set too low, + performance may decrease on heavy loaded systems. If set to `0`, It is checked + every time. (Default: `20000`) +* `initialSize`: The initial number of connections that are created when the + pool is started. If set to `0`, this feature is disabled. (Default: `0`) +* `maxIdle`: The maximum number of connections that can remain idle in the pool. + If set to `0`, there is no limit. (Default: `10`) +* `minIdle`: The minimum number of connections that can remain idle in the pool. + (Default: `0`) +* `maxReuseCount`: The maximum connection reuse count allows connections to be + gracefully closed and removed from the connection pool after a connection has + been borrowed a specific number of times. If set to `0`, this feature is disabled. + (Default: `0`) +* `timeBetweenEvictionRunsMillis` : The number of milliseconds to sleep between + runs of examining idle connections. The eviction timer will remove existent + idle conntions by `minEvictableIdleTimeMillis` or create new idle connections + by `minIdle`. If set to `0`, this feature is disabled. (Default: `0`) +* `minEvictableIdleTimeMillis`: The minimum amount of time the connection + may sit idle in the pool before it is eligible for eviction due to idle time. + If set to `0`, no connection will be dropped. (Default: `1800000`) +* `numTestsPerEvictionRun` : The number of connections to examine during each run + of the eviction timer (if any). (Default: `3`) ## Pool events @@ -442,6 +471,29 @@ pool.on('release', function (connection) { }); ``` +### prepared + +The pool will emit a `prepared` event when the pool is ready to use. +If `initialSize` is set, this is called after all initial connections are created. + +```js +pool.on('prepared', function (count) { + if (count > 0) { + console.log('Created %d initial connections', count); + } +}); +``` + +### eviction + +The pool will emit a `eviction` event when the eviction timer runs. + +```js +pool.on('eviction', function (result) { + console.log('Removed : %d / Created : %d', connection.removed, connection.created); +}); +``` + ## Closing all the connections in a pool When you are done using the pool, you have to end all the connections or the @@ -463,6 +515,19 @@ pending queries will still complete and the time to end the pool will vary. **Once `pool.end()` has been called, `pool.getConnection` and other operations can no longer be performed** +## Monitoring the status of a pool + +If you want to know about the status of the pool, use the `getStatus` method. +It provides 4 values(all, use, idle, queue). + +```js +var status = pool.getStatus(); +console.log('All connected connections : %d', status.all); +console.log('Connections being used : %d', status.use); +console.log('Idle connections : %d', status.idle); +console.log('Queued requests : %d', status.queue); +``` + ## PoolCluster PoolCluster provides multiple hosts connection. (group & retry & selector) diff --git a/lib/ConnectionConfig.js b/lib/ConnectionConfig.js index 147aa0abb..0ce6264a8 100644 --- a/lib/ConnectionConfig.js +++ b/lib/ConnectionConfig.js @@ -16,9 +16,7 @@ function ConnectionConfig(options) { this.user = options.user || undefined; this.password = options.password || undefined; this.database = options.database; - this.connectTimeout = (options.connectTimeout === undefined) - ? (10 * 1000) - : options.connectTimeout; + this.connectTimeout = (options.connectTimeout === undefined) ? (10 * 1000) : options.connectTimeout; this.insecureAuth = options.insecureAuth || false; this.supportBigNumbers = options.supportBigNumbers || false; this.bigNumberStrings = options.bigNumberStrings || false; @@ -30,13 +28,9 @@ function ConnectionConfig(options) { this.flags = options.flags || ''; this.queryFormat = options.queryFormat; this.pool = options.pool || undefined; - this.ssl = (typeof options.ssl === 'string') - ? ConnectionConfig.getSSLProfile(options.ssl) - : (options.ssl || false); + this.ssl = (typeof options.ssl === 'string') ? ConnectionConfig.getSSLProfile(options.ssl) : (options.ssl || false); this.multipleStatements = options.multipleStatements || false; - this.typeCast = (options.typeCast === undefined) - ? true - : options.typeCast; + this.typeCast = (options.typeCast === undefined) ? true : options.typeCast; if (this.timezone[0] === ' ') { // "+" is a url encoded char for space so it @@ -51,32 +45,42 @@ function ConnectionConfig(options) { } this.maxPacketSize = 0; - this.charsetNumber = (options.charset) - ? ConnectionConfig.getCharsetNumber(options.charset) - : options.charsetNumber || Charsets.UTF8_GENERAL_CI; + this.charsetNumber = (options.charset) ? ConnectionConfig.getCharsetNumber(options.charset) : options.charsetNumber || Charsets.UTF8_GENERAL_CI; // Set the client flags - var defaultFlags = ConnectionConfig.getDefaultFlags(options); - this.clientFlags = ConnectionConfig.mergeFlags(defaultFlags, options.flags); + this.clientFlags = ConnectionConfig._getClientFlags(this.multipleStatements, this.flags); } +var _clientFlagsCache = {}; + +ConnectionConfig._getClientFlags = function(multipleStatements, flags) { + var key = flags + multipleStatements; + + if (_clientFlagsCache[key] === undefined) { + var defaultFlags = ConnectionConfig.getDefaultFlags(multipleStatements); + _clientFlagsCache[key] = ConnectionConfig.mergeFlags(defaultFlags, flags); + } + + return _clientFlagsCache[key]; +}; + ConnectionConfig.mergeFlags = function mergeFlags(defaultFlags, userFlags) { var allFlags = ConnectionConfig.parseFlagList(defaultFlags); var newFlags = ConnectionConfig.parseFlagList(userFlags); // Merge the new flags - for (var flag in newFlags) { - if (allFlags[flag] !== false) { - allFlags[flag] = newFlags[flag]; + for (var newFlagKey in newFlags) { + if (allFlags[newFlagKey] !== false) { + allFlags[newFlagKey] = newFlags[newFlagKey]; } } // Build flags var flags = 0x0; - for (var flag in allFlags) { - if (allFlags[flag]) { + for (var key in allFlags) { + if (allFlags[key]) { // TODO: Throw here on some future release - flags |= ClientConstants['CLIENT_' + flag] || 0x0; + flags |= ClientConstants['CLIENT_' + key] || 0x0; } } @@ -93,7 +97,7 @@ ConnectionConfig.getCharsetNumber = function getCharsetNumber(charset) { return num; }; -ConnectionConfig.getDefaultFlags = function getDefaultFlags(options) { +ConnectionConfig.getDefaultFlags = function getDefaultFlags(multipleStatements) { var defaultFlags = [ '-COMPRESS', // Compression protocol *NOT* supported '-CONNECT_ATTRS', // Does *NOT* send connection attributes in Protocol::HandshakeResponse41 @@ -114,7 +118,7 @@ ConnectionConfig.getDefaultFlags = function getDefaultFlags(options) { '+TRANSACTIONS' // Expects status flags ]; - if (options && options.multipleStatements) { + if (multipleStatements) { // May send multiple statements per COM_QUERY and COM_STMT_PREPARE defaultFlags.push('+MULTI_STATEMENTS'); } @@ -143,9 +147,7 @@ ConnectionConfig.parseFlagList = function parseFlagList(flagList) { return allFlags; } - var flags = !Array.isArray(flagList) - ? String(flagList || '').toUpperCase().split(/\s*,+\s*/) - : flagList; + var flags = !Array.isArray(flagList) ? String(flagList || '').toUpperCase().split(/\s*,+\s*/) : flagList; for (var i = 0; i < flags.length; i++) { var flag = flags[i]; diff --git a/lib/Pool.js b/lib/Pool.js index 87a40114a..55263c561 100644 --- a/lib/Pool.js +++ b/lib/Pool.js @@ -2,189 +2,59 @@ var mysql = require('../'); var Connection = require('./Connection'); var EventEmitter = require('events').EventEmitter; var Util = require('util'); -var PoolConnection = require('./PoolConnection'); +var PoolConnectionManager = require('./PoolConnectionManager'); module.exports = Pool; Util.inherits(Pool, EventEmitter); function Pool(options) { EventEmitter.call(this); - this.config = options.config; - this.config.connectionConfig.pool = this; - this._acquiringConnections = []; - this._allConnections = []; - this._freeConnections = []; - this._connectionQueue = []; - this._closed = false; + this.config = options.config; + this._manager = new PoolConnectionManager(this, options.config); + this._closed = false; } -Pool.prototype.getConnection = function (cb) { - - if (this._closed) { - var err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - process.nextTick(function () { - cb(err); - }); - return; - } - - var connection; - var pool = this; - - if (this._freeConnections.length > 0) { - connection = this._freeConnections.shift(); - this.acquireConnection(connection, cb); - return; - } - - if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) { - connection = new PoolConnection(this, { config: this.config.newConnectionConfig() }); - - this._acquiringConnections.push(connection); - this._allConnections.push(connection); - - connection.connect({timeout: this.config.acquireTimeout}, function onConnect(err) { - spliceConnection(pool._acquiringConnections, connection); - - if (pool._closed) { - err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - } - - if (err) { - pool._purgeConnection(connection); - cb(err); - return; - } - - pool.emit('connection', connection); - pool.emit('acquire', connection); - cb(null, connection); - }); +Pool.prototype.getConnection = function (callback) { + if (this._isClosed(callback)) { return; } - if (!this.config.waitForConnections) { - process.nextTick(function(){ - var err = new Error('No connections available.'); - err.code = 'POOL_CONNLIMIT'; - cb(err); - }); - return; - } + var manager = this._manager; - this._enqueueCallback(cb); -}; - -Pool.prototype.acquireConnection = function acquireConnection(connection, cb) { - if (connection._pool !== this) { - throw new Error('Connection acquired from wrong pool.'); - } - - var changeUser = this._needsChangeUser(connection); - var pool = this; - - this._acquiringConnections.push(connection); - - function onOperationComplete(err) { - spliceConnection(pool._acquiringConnections, connection); - - if (pool._closed) { - err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - } - - if (err) { - pool._connectionQueue.unshift(cb); - pool._purgeConnection(connection); + if (manager.isPrepared()) { + var connection = manager.getIdleConnection(); + if (connection) { + manager.executeCallback(callback, connection); return; } - if (changeUser) { - pool.emit('connection', connection); + if (manager.canCreateNewConnection()) { + manager.createNewConnection(callback); + return; } - - pool.emit('acquire', connection); - cb(null, connection); } - if (changeUser) { - // restore user back to pool configuration - connection.config = this.config.newConnectionConfig(); - connection.changeUser({timeout: this.config.acquireTimeout}, onOperationComplete); - } else { - // ping connection - connection.ping({timeout: this.config.acquireTimeout}, onOperationComplete); - } + // If there is no available connection + manager.putCallbackToQueue(callback); }; -Pool.prototype.releaseConnection = function releaseConnection(connection) { - - if (this._acquiringConnections.indexOf(connection) !== -1) { - // connection is being acquired - return; - } - - if (connection._pool) { - if (connection._pool !== this) { - throw new Error('Connection released to wrong pool'); - } - - if (this._freeConnections.indexOf(connection) !== -1) { - // connection already in free connection pool - // this won't catch all double-release cases - throw new Error('Connection already released'); - } else { - // add connection to end of free queue - this._freeConnections.push(connection); - this.emit('release', connection); - } - } - - if (this._closed) { - // empty the connection queue - this._connectionQueue.splice(0).forEach(function (cb) { - var err = new Error('Pool is closed.'); - err.code = 'POOL_CLOSED'; - process.nextTick(function () { - cb(err); - }); - }); - } else if (this._connectionQueue.length) { - // get connection with next waiting callback - this.getConnection(this._connectionQueue.shift()); +Pool.prototype.releaseConnection = function (connection) { + if (!this._closed) { + this._manager.releaseConnection(connection); } }; -Pool.prototype.end = function (cb) { +Pool.prototype.end = function (callback) { this._closed = true; - if (typeof cb !== 'function') { - cb = function (err) { + if (typeof callback !== 'function') { + callback = function (err) { if (err) throw err; }; } - var calledBack = false; - var waitingClose = 0; - - function onEnd(err) { - if (!calledBack && (err || --waitingClose <= 0)) { - calledBack = true; - cb(err); - } - } - - while (this._allConnections.length !== 0) { - waitingClose++; - this._purgeConnection(this._allConnections[0], onEnd); - } - - if (waitingClose === 0) { - process.nextTick(onEnd); - } + this._manager.destroy(callback); }; Pool.prototype.query = function (sql, values, cb) { @@ -217,78 +87,34 @@ Pool.prototype.query = function (sql, values, cb) { return query; }; -Pool.prototype._enqueueCallback = function _enqueueCallback(callback) { - - if (this.config.queueLimit && this._connectionQueue.length >= this.config.queueLimit) { - process.nextTick(function () { - var err = new Error('Queue limit reached.'); - err.code = 'POOL_ENQUEUELIMIT'; - callback(err); - }); - return; - } - - // Bind to domain, as dequeue will likely occur in a different domain - var cb = process.domain - ? process.domain.bind(callback) - : callback; - - this._connectionQueue.push(cb); - this.emit('enqueue'); -}; - -Pool.prototype._needsChangeUser = function _needsChangeUser(connection) { - var connConfig = connection.config; - var poolConfig = this.config.connectionConfig; - - // check if changeUser values are different - return connConfig.user !== poolConfig.user - || connConfig.database !== poolConfig.database - || connConfig.password !== poolConfig.password - || connConfig.charsetNumber !== poolConfig.charsetNumber; +Pool.prototype.escape = function(value) { + return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone); }; -Pool.prototype._purgeConnection = function _purgeConnection(connection, callback) { - var cb = callback || function () {}; - - if (connection.state === 'disconnected') { - connection.destroy(); - } - - this._removeConnection(connection); - - if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) { - connection._realEnd(cb); - return; - } - - process.nextTick(cb); +Pool.prototype.escapeId = function escapeId(value) { + return mysql.escapeId(value, false); }; -Pool.prototype._removeConnection = function(connection) { - connection._pool = null; - - // Remove connection from all connections - spliceConnection(this._allConnections, connection); - - // Remove connection from free connections - spliceConnection(this._freeConnections, connection); - - this.releaseConnection(connection); +Pool.prototype.getStatus = function () { + return this._manager.getStatus(); }; -Pool.prototype.escape = function(value) { - return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone); +Pool.prototype._purgeConnection = function (connection) { + this._manager.purgeConnection(connection); }; -Pool.prototype.escapeId = function escapeId(value) { - return mysql.escapeId(value, false); -}; +Pool.prototype._isClosed = function (callback) { + if (this._closed) { + if (callback) { + var err = new Error('Pool is closed.'); + err.code = 'POOL_CLOSED'; + process.nextTick(function () { + callback(err); + }); + } -function spliceConnection(array, connection) { - var index; - if ((index = array.indexOf(connection)) !== -1) { - // Remove connection from all connections - array.splice(index, 1); + return true; } -} + + return false; +}; diff --git a/lib/PoolConfig.js b/lib/PoolConfig.js index 8c5017a27..2def607e2 100644 --- a/lib/PoolConfig.js +++ b/lib/PoolConfig.js @@ -7,26 +7,41 @@ function PoolConfig(options) { options = ConnectionConfig.parseUrl(options); } - this.acquireTimeout = (options.acquireTimeout === undefined) - ? 10 * 1000 - : Number(options.acquireTimeout); - this.connectionConfig = new ConnectionConfig(options); - this.waitForConnections = (options.waitForConnections === undefined) - ? true - : Boolean(options.waitForConnections); - this.connectionLimit = (options.connectionLimit === undefined) - ? 10 - : Number(options.connectionLimit); - this.queueLimit = (options.queueLimit === undefined) - ? 0 - : Number(options.queueLimit); + this.connectionConfig = new ConnectionConfig(options); + + this.acquireTimeout = this._getPropertyNumber(options.acquireTimeout, 10000); + this.waitForConnections = this._getPropertyBoolean(options.waitForConnections, true); + this.connectionLimit = this._getPropertyNumber(options.connectionLimit, 10); + this.queueLimit = this._getPropertyNumber(options.queueLimit, 0); + this.queueTimeout = this._getPropertyNumber(options.queueTimeout, 0); + this.testOnBorrow = this._getPropertyBoolean(options.testOnBorrow, true); + this.testOnBorrowInterval = this._getPropertyNumber(options.testOnBorrowInterval, 20000); + this.initialSize = this._getPropertyNumber(options.initialSize, 0); + this.maxIdle = Math.min(this.connectionLimit, this._getPropertyNumber(options.maxIdle, 10)); + this.minIdle = Math.min(this.connectionLimit, this._getPropertyNumber(options.minIdle, 0)); + this.maxReuseCount = this._getPropertyNumber(options.maxReuseCount, 0); + this.timeBetweenEvictionRunsMillis = this._getPropertyNumber(options.timeBetweenEvictionRunsMillis, 0); + this.numTestsPerEvictionRun = this._getPropertyNumber(options.numTestsPerEvictionRun, 3); + this.minEvictableIdleTimeMillis = this._getPropertyNumber(options.minEvictableIdleTimeMillis, 1800000); } -PoolConfig.prototype.newConnectionConfig = function newConnectionConfig() { - var connectionConfig = new ConnectionConfig(this.connectionConfig); +PoolConfig.prototype.newConnectionConfig = function () { + var newConfig = {}; + var connectionConfig = this.connectionConfig; + + for (var key in connectionConfig) { + if (connectionConfig.hasOwnProperty(key)) { + newConfig[key] = connectionConfig[key]; + } + } + + return newConfig; +}; - connectionConfig.clientFlags = this.connectionConfig.clientFlags; - connectionConfig.maxPacketSize = this.connectionConfig.maxPacketSize; +PoolConfig.prototype._getPropertyNumber = function (value, defaultValue) { + return value === undefined ? defaultValue : Number(value); +}; - return connectionConfig; +PoolConfig.prototype._getPropertyBoolean = function (value, defaultValue) { + return value === undefined ? defaultValue : Boolean(value); }; diff --git a/lib/PoolConnection.js b/lib/PoolConnection.js index d29581804..3df9ddd60 100644 --- a/lib/PoolConnection.js +++ b/lib/PoolConnection.js @@ -5,9 +5,17 @@ var Events = require('events'); module.exports = PoolConnection; inherits(PoolConnection, Connection); +var _connectionId = 1; + function PoolConnection(pool, options) { Connection.call(this, options); - this._pool = pool; + + this._id = _connectionId++; + this._pool = pool; + this._used = true; + this._removed = false; + this._reuseCount = 0; + this._lastUsedTime = Date.now(); // Bind connection to pool domain if (Events.usingDomains) { @@ -25,40 +33,79 @@ function PoolConnection(pool, options) { }); } -PoolConnection.prototype.release = function release() { - var pool = this._pool; - - if (!pool || pool._closed) { - return undefined; +PoolConnection.prototype.release = function () { + if (this.hasPool()) { + this._pool.releaseConnection(this); } - - return pool.releaseConnection(this); }; // TODO: Remove this when we are removing PoolConnection#end PoolConnection.prototype._realEnd = Connection.prototype.end; PoolConnection.prototype.end = function () { - console.warn( 'Calling conn.end() to release a pooled connection is ' - + 'deprecated. In next version calling conn.end() will be ' - + 'restored to default conn.end() behavior. Use ' - + 'conn.release() instead.' + console.warn( 'Calling conn.end() to release a pooled connection is ' + + 'deprecated. In next version calling conn.end() will be ' + + 'restored to default conn.end() behavior. Use ' + + 'conn.release() instead.' ); this.release(); }; PoolConnection.prototype.destroy = function () { - Connection.prototype.destroy.apply(this, arguments); - this._removeFromPool(this); + Connection.prototype.destroy.call(this); + this._removeFromPool(); }; -PoolConnection.prototype._removeFromPool = function _removeFromPool() { - if (!this._pool || this._pool._closed) { - return; - } +PoolConnection.prototype.getId = function() { + return this._id; +}; + +PoolConnection.prototype.updateLastUsedTime = function() { + this._lastUsedTime = Date.now(); +}; + +PoolConnection.prototype.getLastUsedTime = function() { + return this._lastUsedTime; +}; + +PoolConnection.prototype.setUsed = function(used) { + this._used = used; +}; + +PoolConnection.prototype.isUsed = function() { + return this._used; +}; - var pool = this._pool; +PoolConnection.prototype.setRemoved = function(removed) { + this._removed = removed; +}; + +PoolConnection.prototype.isRemoved = function() { + return this._removed; +}; + +PoolConnection.prototype.increaseReuseCount = function() { + this._reuseCount++; +}; + +PoolConnection.prototype.getReuseCount = function() { + return this._reuseCount; +}; + +PoolConnection.prototype.hasPool = function() { + return this._pool !== null; +}; + +PoolConnection.prototype.isSamePool = function(pool) { + return this._pool === pool; +}; + +PoolConnection.prototype.detachPool = function() { this._pool = null; +}; - pool._purgeConnection(this); +PoolConnection.prototype._removeFromPool = function () { + if (this.hasPool()) { + this._pool._purgeConnection(this); + } }; diff --git a/lib/PoolConnectionManager.js b/lib/PoolConnectionManager.js new file mode 100644 index 000000000..ae24bc64b --- /dev/null +++ b/lib/PoolConnectionManager.js @@ -0,0 +1,596 @@ +var PoolConnection = require('./PoolConnection'); +var PoolConnectionManagerData = require('./PoolConnectionManagerData'); + +module.exports = PoolConnectionManager; + +/** + * PoolConnectionManager + * + * @constructor + * @param {Pool} pool + * @param {PoolConfig} config + * @api public + */ + +function PoolConnectionManager(pool, config) { + this._pool = pool; + this._config = config; + this._prepared = false; + + // Frequently used variables + this._neverWaitForConnections = !config.waitForConnections; + this._connectionLimit = config.connectionLimit; + this._queueLimit = config.queueLimit; + this._testOnBorrow = config.testOnBorrow; + this._testOnBorrowInterval = config.testOnBorrowInterval; + this._maxIdle = config.maxIdle; + this._maxReuseCount = config.maxReuseCount; + this._timeoutConfig = { + timeout: config.acquireTimeout + }; + + this._handleQueuedCallbacksRef = this._handleQueuedCallbacks.bind(this); + this._evictionTimerRef = this._evictionTimer.bind(this); + this._startEvitionTimerRef = this._startEvitionTimer.bind(this); + + // Variables for handling connections and callbacks + this._allConnection = new PoolConnectionManagerData.AllConnection(); + this._idleConnection = new PoolConnectionManagerData.IdleConnection(); + this._callbackQueue = new PoolConnectionManagerData.CallbackQueue(config.queueTimeout); + + this._createInitialConnections(config.initialSize); +} + +/** + * Checks if `Pool` is prepared. + * + * @return {boolean} + * @api public + */ + +PoolConnectionManager.prototype.isPrepared = function () { + return this._prepared; +}; + +/** + * Returns the idle connection. + * If there's no idle connection, return null. + * + * @returns {Object|null} + * @api public + */ + +PoolConnectionManager.prototype.getIdleConnection = function () { + if (this._idleConnection.isEmpty()) { + return null; + } + + var id = this._idleConnection.pop(); + var connection = this._allConnection.get(id); + + if (connection) { + connection.increaseReuseCount(); + } + + return connection; +}; + +/** + * Executes the callback with the acquired connection. + * + * @return {Function} callback + * @param {PoolConnection} connection + * @api public + */ + +PoolConnectionManager.prototype.executeCallback = function (callback, connection) { + var pool = this._pool; + var self = this; + + if (!connection.isSamePool(pool)) { + this._raiseError(callback, 'POOL_ERROR', 'Connection acquired from wrong pool.'); + return; + } + + // If user is changed + var changedUser = this._isChangedUser(connection); + + if (changedUser) { + // Restores user back to pool configuration + connection.config = this._config.newConnectionConfig(); + connection.changeUser(this._timeoutConfig, onOperationComplete); + return; + } + + if (!this._testOnBorrow) { + process.nextTick(function() { + self._executeCallbackDirectly(callback, connection); + }); + return; + } + + // Reuse recently used connections without ping check. + if (this._testOnBorrowInterval > 0 && Date.now() - connection.getLastUsedTime() < this._testOnBorrowInterval) { + process.nextTick(function() { + if (connection.isRemoved()) { + connection.ping(self._timeoutConfig, onOperationComplete); + } else { + self._executeCallbackDirectly(callback, connection); + } + }); + } else { + connection.ping(this._timeoutConfig, onOperationComplete); + } + + function onOperationComplete(err) { + if (pool._isClosed(callback)) { + return; + } + + if (err) { + self.purgeConnection(connection); + self._callbackQueue.rollback(callback); + process.nextTick(self._handleQueuedCallbacksRef); + return; + } + + if (changedUser) { + connection.config.changedUser = false; + pool.emit('connection', connection); + } + + self._executeCallbackDirectly(callback, connection); + } +}; + +PoolConnectionManager.prototype._executeCallbackDirectly = function (callback, connection) { + connection.setUsed(true); + this._pool.emit('acquire', connection); + callback(null, connection); +}; + +/** + * Checks if `Pool` can create a new connection. + * + * @return {Boolean} + * @api public + */ + +PoolConnectionManager.prototype.canCreateNewConnection = function () { + return (this._connectionLimit === 0 || this._allConnection.size() < this._connectionLimit); +}; + +/** + * Create a new connection. + * + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.createNewConnection = function (callback) { + var pool = this._pool; + var self = this; + + var connection = new PoolConnection(pool, { config: this._config.newConnectionConfig() }); + + self._allConnection.add(connection); + + connection.connect(this._timeoutConfig, function onConnect(err) { + if (pool._isClosed(callback)) { + return; + } + + if (err) { + self.purgeConnection(connection); + callback(err); + return; + } + + pool.emit('connection', connection); + pool.emit('acquire', connection); + + callback(null, connection); + }); +}; + +/** + * Puts the callback into the queue. + * The callback is handled when possible. + * + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.putCallbackToQueue = function (callback) { + if (this._neverWaitForConnections) { + this._raiseError(callback, 'POOL_CONNLIMIT', 'No connections available.'); + return; + } + + if (this._queueLimit && this._callbackQueue.size() >= this._queueLimit) { + this._raiseError(callback, 'POOL_ENQUEUELIMIT', 'Queue limit reached.'); + return; + } + + // Binds to domain, as dequeue will likely occur in a different domain + if (process.domain) { + this._callbackQueue.add(process.domain.bind(callback)); + } else { + this._callbackQueue.add(callback); + } + + this._pool.emit('enqueue'); +}; + +/** + * Releases a used connection. + * + * @param {PoolConnection} connection + * @api public + */ + +PoolConnectionManager.prototype.releaseConnection = function (connection) { + if (!connection.isUsed()) { + throw new Error('Connection already released'); + } + + if (!this._callbackQueue.isEmpty()) { + this.executeCallback(this._callbackQueue.pop(), connection); + return; + } + + if ((this._maxReuseCount > 0 && connection.getReuseCount() >= this._maxReuseCount) || + (this._maxIdle > 0 && this._idleConnection.size() >= this._maxIdle)) { + this._removeConnection(connection, true); + } else { + connection.updateLastUsedTime(); + connection.setUsed(false); + this._idleConnection.add(connection); + this._pool.emit('release', connection); + } +}; + +/** + * Destroys all. + * + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.destroy = function (callback) { + if (this._evictionTimerHandle) { + clearTimeout(this._evictionTimerHandle); + } + + // throws error to waiting connections + var self = this; + + this._callbackQueue.destroy(function (queuedCallback) { + self._raiseError(queuedCallback, 'POOL_CLOSED', 'Pool is closed.'); + }); + + var allConnectionsSize = this._allConnection.size(); + if (allConnectionsSize === 0) { + callback(); + return; + } + + // purges all connections + var calledBack = false; + function onEndPurge(err) { + if (!calledBack && (err || --allConnectionsSize <= 0)) { + calledBack = true; + callback(err); + } + } + + this._allConnection.destroy(function (connection) { + self.purgeConnection(connection, onEndPurge); + }); +}; + +/** + * Purges the connection from `Pool`. + * + * @param {PoolConnection} connection + * @param {Function} callback + * @api public + */ + +PoolConnectionManager.prototype.purgeConnection = function (connection, callback) { + if (connection.isRemoved()) { + return; + } + + connection.setRemoved(true); + + if (typeof callback === 'undefined') { + callback = function() {}; + } + + var isDisconnected = connection.state === 'disconnected'; + if (isDisconnected) { + connection.destroy(); + } + + this._removeConnection(connection); + + if (!isDisconnected && !connection._protocol._quitSequence) { + connection._realEnd(callback); + return; + } + + process.nextTick(callback); +}; + +/** + * Returns the status of `Pool`. + * + * @return {Object} + * @api public + */ + +PoolConnectionManager.prototype.getStatus = function () { + return { + all : this._allConnection.size(), + use : this._allConnection.size() - this._idleConnection.size(), + idle : this._idleConnection.size(), + queue : this._callbackQueue.size() + }; +}; + +/** + * Removes the connection. + * + * @param {PoolConnection} connection + * @api private + */ + +PoolConnectionManager.prototype._removeConnection = function (connection, destory) { + if (connection.hasPool()) { + connection.detachPool(); + this._allConnection.remove(connection); + this._idleConnection.remove(connection); + + if (destory) { + process.nextTick(connection.destroy.bind(connection)); + } + } +}; + +/** + * Handles one of the callbacks that are waiting for the connection. + * + * @api private + */ + +PoolConnectionManager.prototype._handleQueuedCallbacks = function () { + if (this._callbackQueue.isEmpty()) { + return; + } + + var connection = this.getIdleConnection(); + if (connection) { + this.executeCallback(this._callbackQueue.pop(), connection); + } else if (this.canCreateNewConnection()) { + this.createNewConnection(this._callbackQueue.pop()); + } +}; + +/** + * Checks if user is changed. + * + * @param {PoolConnection} connection + * @return {Boolean} + * @api private + */ + +PoolConnectionManager.prototype._isChangedUser = function (connection) { + var connConfig = connection.config; + var poolConfig = this._config.connectionConfig; + + return connConfig.changedUser && + (connConfig.user !== poolConfig.user || + connConfig.database !== poolConfig.database || + connConfig.password !== poolConfig.password || + connConfig.charsetNumber !== poolConfig.charsetNumber); +}; + +/** + * Creates initial connections. + * + * @param {number} initialSize + * @api private + */ + +PoolConnectionManager.prototype._createInitialConnections = function (initialSize) { + this._prepared = false; + + if (initialSize === 0) { + this._onPreparedInitialConnections(0); + return; + } + + var self = this; + var createdConnectionCount = 0; + var step = 0; + + function onCreatedConnection(success) { + if (success) { + createdConnectionCount++; + } + + if (++step === initialSize) { + self._onPreparedInitialConnections(createdConnectionCount); + } + } + + for (var i = 1; i <= initialSize; i++) { + this._createNewIdleConnection(onCreatedConnection); + } +}; + +/** + * Creates a new idle connection. + * + * @param {Function} callback + * @api private + */ + +PoolConnectionManager.prototype._createNewIdleConnection = function (callback) { + var pool = this._pool; + var self = this; + + var connection = new PoolConnection(pool, { config: this._config.newConnectionConfig() }); + + connection.connect(this._timeoutConfig, function onConnect(err) { + if (pool._isClosed() || err) { + if (callback) { + callback(false); + } + return; + } + + connection.setUsed(false); + + self._allConnection.add(connection); + self._idleConnection.add(connection); + pool.emit('connection', connection); + + if (callback) { + callback(true); + } + }); +}; + +/** + * Called when `Pool` is ready. + * + * @param {number} createdConnectionCount + * @api private + */ + +PoolConnectionManager.prototype._onPreparedInitialConnections = function (createdConnectionCount) { + this._prepared = true; + + this._pool.emit('prepared', createdConnectionCount); + + if (createdConnectionCount > 0 && !this._callbackQueue.isEmpty()) { + for (var i = 0; i < createdConnectionCount; i++) { + process.nextTick(this._handleQueuedCallbacksRef); + } + } + + // starts idle connection evictor timer + if (this._config.timeBetweenEvictionRunsMillis > 0) { + this._startEvitionTimer(); + } +}; + +PoolConnectionManager.prototype._startEvitionTimer = function () { + if (this._evictionTimerHandle) { + clearTimeout(this._evictionTimerHandle); + this._evictionTimerHandle = null; + } + + this._evictionTimerHandle = setTimeout(this._evictionTimerRef, this._config.timeBetweenEvictionRunsMillis); +}; + +PoolConnectionManager.prototype._evictionTimer = function () { + var removedConnection = this._evictIdle(); + var createdConnection = this._ensureMinIdle(this._startEvitionTimerRef); + + this._pool.emit('eviction', { + removed : removedConnection, + created : createdConnection + }); +}; + +/** + * Evicts idle connection. + * + * @api private + */ + +PoolConnectionManager.prototype._evictIdle = function () { + var minEvictableIdleTimeMillis = this._config.minEvictableIdleTimeMillis; + if (minEvictableIdleTimeMillis <= 0) { + return 0; + } + + var idleConnectionCount = this._idleConnection.size(); + if (idleConnectionCount === 0) { + return 0; + } + + var targetConnectionCount = Math.min(this._config.numTestsPerEvictionRun, idleConnectionCount); + var targetConnections = this._idleConnection.lookup(targetConnectionCount); + + var checkTime = Date.now() - minEvictableIdleTimeMillis; + var removedConnection = 0; + + for (var i = 0; i < targetConnections.length; i++) { + var id = targetConnections[i]; + var connection = this._allConnection.get(id); + + if (!connection || connection.getLastUsedTime() < checkTime) { + this._removeConnection(connection, true); + removedConnection++; + } + } + + return removedConnection; +}; + +/** + * Checks if `Pool` has the minimum number of connections. + * Creates new connections if it's insufficient. + * + * @api private + */ + +PoolConnectionManager.prototype._ensureMinIdle = function (callback) { + var minIdle = this._config.minIdle; + if (minIdle <= 0) { + callback(); + return 0; + } + + var newConnectionCount = minIdle - this._idleConnection.size(); + if (newConnectionCount <= 0) { + callback(); + return 0; + } + + var step = 0; + function onCreatedConnection() { + if (++step === newConnectionCount) { + callback(); + } + } + + for (var i = 0; i < newConnectionCount; i++) { + this._createNewIdleConnection(onCreatedConnection); + } + + return newConnectionCount; +}; + +/** + * Raises the error to callback. + * + * @param {Function} callback + * @param {string} code + * @param {string} message + * @api private + */ + +PoolConnectionManager.prototype._raiseError = function (callback, code, message) { + if (callback) { + var err = new Error(message); + err.code = code; + process.nextTick(function () { + callback(err); + }); + } +}; diff --git a/lib/PoolConnectionManagerData.js b/lib/PoolConnectionManagerData.js new file mode 100644 index 000000000..99fbd9465 --- /dev/null +++ b/lib/PoolConnectionManagerData.js @@ -0,0 +1,185 @@ +var PoolConnectionManagerData = module.exports = {}; + +/** + * AllConnection : handling all connections. + */ + +var AllConnection = PoolConnectionManagerData.AllConnection = function() { + this._map = {}; + this._size = 0; +}; + +AllConnection.prototype.add = function(connection) { + this._map[connection.getId()] = connection; + this._size++; +}; + +AllConnection.prototype.set = function(connection) { + this._map[connection.getId()] = connection; +}; + +AllConnection.prototype.remove = function(connection) { + if (this._map[connection.getId()] !== undefined) { + delete this._map[connection.getId()]; + this._size--; + } +}; + +AllConnection.prototype.get = function(id) { + return this._map[id] || null; +}; + +AllConnection.prototype.isEmpty = function() { + return this._size === 0; +}; + +AllConnection.prototype.size = function() { + return this._size; +}; + +AllConnection.prototype.destroy = function(destroyCallback) { + if (this._size > 0) { + if (destroyCallback) { + var map = this._map; + for (var id in map) { + destroyCallback(map[id]); + } + } + + this._map = {}; + this._size = 0; + } +}; + +/** + * IdleConnection : handling idle connections. + */ + +var IdleConnection = PoolConnectionManagerData.IdleConnection = function() { + this._stack = []; +}; + +IdleConnection.prototype.add = function(connection) { + this._stack.push(connection.getId()); +}; + +IdleConnection.prototype.remove = function(connection) { + var index = this._stack.indexOf(connection.getId()); + + if (index !== -1) { + this._stack.splice(index, 1); + } +}; + +IdleConnection.prototype.pop = function() { + return this._stack.pop() || null; +}; + +IdleConnection.prototype.isEmpty = function() { + return this._stack.length === 0; +}; + +IdleConnection.prototype.size = function() { + return this._stack.length; +}; + +IdleConnection.prototype.lookup = function(num) { + return this._stack.slice(0, num); +}; + +/** + * CallbackQueue : handling queued callbacks. + * + * @param {number} timeout + */ + +var CallbackQueue = PoolConnectionManagerData.CallbackQueue = function(timeout) { + this._queue = []; + this._id = 1; + this._timerHandles = {}; + this._timeout = timeout; +}; + +CallbackQueue.prototype.add = function(callback) { + this._queue.push(this._makeCallback(callback)); +}; + +CallbackQueue.prototype.pop = function() { + var item = this._queue.shift(); + + if (item) { + this._clearTimer(item.id); + return item.callback; + } else { + return item; + } +}; + +CallbackQueue.prototype.rollback = function(callback) { + this._queue.unshift(this._makeCallback(callback)); +}; + +CallbackQueue.prototype.isEmpty = function() { + return this._queue.length === 0; +}; + +CallbackQueue.prototype.size = function() { + return this._queue.length; +}; + +CallbackQueue.prototype.destroy = function(destroyCallback) { + var queue = this._queue; + + for (var i = 0, len = queue.length; i < len; i++) { + this._clearTimer(queue[i].id); + + if (destroyCallback) { + destroyCallback(queue[i].callback); + } + } + + this._queue = []; + this._id = 1; +}; + +CallbackQueue.prototype._makeCallback = function(callback) { + var queueItem = { + id : this._id++, + callback : callback + }; + + if (this._timeout > 0) { + var self = this; + var itemId = queueItem.id; + this._timerHandles[itemId] = setTimeout(function() { + self._onTimeout(itemId); + }, this._timeout); + } + + return queueItem; +}; + +CallbackQueue.prototype._onTimeout = function(id) { + this._clearTimer(id); + + var queue = this._queue; + + for (var i = 0, len = queue.length; i < len; i++) { + if (queue[i].id === id) { + var callback = queue[i].callback; + queue.splice(i, 1); + + var timeoutError = new Error('Queue timeout occurred.'); + timeoutError.code = 'POOL_QUEUETIMEOUT'; + callback(timeoutError); + break; + } + } +}; + +CallbackQueue.prototype._clearTimer = function(id) { + if (this._timeout > 0 && this._timerHandles[id] !== undefined) { + clearTimeout(this._timerHandles[id]); + delete this._timerHandles[id]; + } +}; diff --git a/lib/protocol/sequences/ChangeUser.js b/lib/protocol/sequences/ChangeUser.js index 26be6dbbd..645737d85 100644 --- a/lib/protocol/sequences/ChangeUser.js +++ b/lib/protocol/sequences/ChangeUser.js @@ -26,10 +26,12 @@ ChangeUser.prototype.start = function(handshakeInitializationPacket) { charsetNumber : this._charsetNumber }); - this._currentConfig.user = this._user; - this._currentConfig.password = this._password; - this._currentConfig.database = this._database; - this._currentConfig.charsetNumber = this._charsetNumber; + var currentConfig = this._currentConfig; + currentConfig.user = this._user; + currentConfig.password = this._password; + currentConfig.database = this._database; + currentConfig.charsetNumber = this._charsetNumber; + currentConfig.changedUser = true; this.emit('packet', packet); }; diff --git a/test/common.js b/test/common.js index db502b60f..1f2a5f57d 100644 --- a/test/common.js +++ b/test/common.js @@ -154,6 +154,5 @@ function mergeTestConfig(config) { password : process.env.MYSQL_PASSWORD, socketPath : process.env.MYSQL_SOCKET }, config); - return config; } diff --git a/test/integration/connection/test-change-user-charset.js b/test/integration/connection/test-change-user-charset.js index 45bf42075..e855c0404 100644 --- a/test/integration/connection/test-change-user-charset.js +++ b/test/integration/connection/test-change-user-charset.js @@ -9,7 +9,7 @@ common.getTestConnection(function (err, connection) { connection.query('SHOW VARIABLES LIKE \'character_set_client\'', function (err, result) { assert.ifError(err); - assert.strictEqual(result[0]['Value'], 'koi8r'); + assert.strictEqual(result[0].Value, 'koi8r'); connection.destroy(); }); diff --git a/test/integration/connection/test-connection-config-flags-affected-rows.js b/test/integration/connection/test-connection-config-flags-affected-rows.js index cd6e725ab..66258e765 100644 --- a/test/integration/connection/test-connection-config-flags-affected-rows.js +++ b/test/integration/connection/test-connection-config-flags-affected-rows.js @@ -12,6 +12,8 @@ common.getTestConnection({flags: '-FOUND_ROWS'}, function (err, connection) { common.useTestDb(connection); + connection.query('DROP TABLE IF EXISTS ??', [table], assert.ifError); + connection.query([ 'CREATE TABLE ?? (', '`a` int(11) unsigned NOT NULL AUTO_INCREMENT,', diff --git a/test/integration/connection/test-format.js b/test/integration/connection/test-format.js new file mode 100644 index 000000000..27ae171a7 --- /dev/null +++ b/test/integration/connection/test-format.js @@ -0,0 +1,14 @@ +var path = require('path'); +var assert = require('assert'); +var common = require('../../common'); +var lib = require(path.resolve(common.lib, '../index')); + +assert.equal( + lib.format('SELECT * FROM ?? WHERE ?? = ?', [ 'table', 'property', 123 ]), + 'SELECT * FROM `table` WHERE `property` = 123' +); + +assert.equal( + lib.format('INSERT INTO ?? SET ?', [ 'table', { property: 123 } ]), + 'INSERT INTO `table` SET `property` = 123' +); diff --git a/test/integration/connection/test-types.js b/test/integration/connection/test-types.js new file mode 100644 index 000000000..6fa8f2969 --- /dev/null +++ b/test/integration/connection/test-types.js @@ -0,0 +1,11 @@ +var path = require('path'); +var assert = require('assert'); +var common = require('../../common'); +var lib = require(path.resolve(common.lib, '../index')); +var types = require(path.resolve(common.lib, 'protocol/constants/types')); + +assert.equal(typeof lib.Types, 'object'); + +for (var k in types) { + assert.equal(lib.Types[k], types[k]); +} diff --git a/test/unit/connection/test-change-user.js b/test/unit/connection/test-change-user.js index 53ca0867a..686ba7815 100644 --- a/test/unit/connection/test-change-user.js +++ b/test/unit/connection/test-change-user.js @@ -21,8 +21,18 @@ server.listen(common.fakeServerPort, function(err) { assert.ifError(err); assert.strictEqual(result[0]['CURRENT_USER()'], 'user_2@localhost'); - connection.destroy(); - server.destroy(); + // should keep current user + connection.changeUser(function (err) { + assert.ifError(err); + + connection.query('SELECT CURRENT_USER()', function (err, result) { + assert.ifError(err); + assert.strictEqual(result[0]['CURRENT_USER()'], 'user_2@localhost'); + + connection.destroy(); + server.destroy(); + }); + }); }); }); }); diff --git a/test/unit/pool/test-acquire-timeout-existing.js b/test/unit/pool/test-acquire-timeout-existing.js index ffdb41603..16d8d8597 100644 --- a/test/unit/pool/test-acquire-timeout-existing.js +++ b/test/unit/pool/test-acquire-timeout-existing.js @@ -1,9 +1,10 @@ var assert = require('assert'); var common = require('../../common'); var pool = common.createPool({ - acquireTimeout : 200, - connectionLimit : 1, - port : common.fakeServerPort + acquireTimeout : 200, + connectionLimit : 1, + port : common.fakeServerPort, + testOnBorrowInterval : 0 }); var server = common.createFakeServer(); @@ -43,7 +44,6 @@ server.listen(common.fakeServerPort, function (err) { }); server.on('connection', function(incomingConnection) { - serverConn = incomingConnection; incomingConnection.handshake({ threadId: ++tid }); diff --git a/test/unit/pool/test-check-connection-disabled.js b/test/unit/pool/test-check-connection-disabled.js new file mode 100644 index 000000000..ee852ee8e --- /dev/null +++ b/test/unit/pool/test-check-connection-disabled.js @@ -0,0 +1,38 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + testOnBorrow : false +}); + +var server = common.createFakeServer(); +var ping = false; + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + conn.release(); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + assert.equal(ping, false); + + conn.release(); + server.destroy(); + }); + }); +}); + +server.on('connection', function(incomingConnection) { + incomingConnection.handshake(); + + incomingConnection.on('ping', function() { + ping = true; + this._sendPacket(new common.Packets.OkPacket()); + this._parser.resetPacketNumber(); + }); +}); diff --git a/test/unit/pool/test-check-connection-interval-high.js b/test/unit/pool/test-check-connection-interval-high.js new file mode 100644 index 000000000..e82ee1fbe --- /dev/null +++ b/test/unit/pool/test-check-connection-interval-high.js @@ -0,0 +1,39 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + testOnBorrowInterval : 20000 +}); + +var server = common.createFakeServer(); +var ping = false; + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + conn.release(); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + assert.equal(ping, false); + + conn.release(); + server.destroy(); + }); + }); +}); + +server.on('connection', function(incomingConnection) { + incomingConnection.handshake(); + + incomingConnection.on('ping', function() { + console.log('ping'); + ping = true; + this._sendPacket(new common.Packets.OkPacket()); + this._parser.resetPacketNumber(); + }); +}); diff --git a/test/unit/pool/test-check-connection-interval-low.js b/test/unit/pool/test-check-connection-interval-low.js new file mode 100644 index 000000000..446b63378 --- /dev/null +++ b/test/unit/pool/test-check-connection-interval-low.js @@ -0,0 +1,39 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + testOnBorrowInterval : 200 +}); + +var server = common.createFakeServer(); +var ping = false; + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + pool.getConnection(function(err, conn) { + assert.ifError(err); + conn.release(); + + setTimeout(function() { + pool.getConnection(function(err) { + assert.ifError(err); + assert.equal(ping, true); + + server.destroy(); + }); + }, 300); + }); +}); + +server.on('connection', function(incomingConnection) { + incomingConnection.handshake(); + + incomingConnection.on('ping', function() { + ping = true; + this._sendPacket(new common.Packets.OkPacket()); + this._parser.resetPacketNumber(); + }); +}); diff --git a/test/unit/pool/test-connection-bad.js b/test/unit/pool/test-connection-bad.js index e9b6dbac4..7b21b402a 100644 --- a/test/unit/pool/test-connection-bad.js +++ b/test/unit/pool/test-connection-bad.js @@ -1,8 +1,9 @@ var assert = require('assert'); var common = require('../../common'); var pool = common.createPool({ - connectionLimit : 1, - port : common.fakeServerPort + connectionLimit : 1, + port : common.fakeServerPort, + testOnBorrowInterval : 0 }); var server = common.createFakeServer(); diff --git a/test/unit/pool/test-connection-domain.js b/test/unit/pool/test-connection-domain.js index f1ec28e31..2373cb7e2 100644 --- a/test/unit/pool/test-connection-domain.js +++ b/test/unit/pool/test-connection-domain.js @@ -32,6 +32,7 @@ server.listen(common.fakeServerPort, function (err) { assert.ok(!domain.active, 'no current domain'); pool.getConnection(function (err, conn) { + assert.ifError(err); assert.equal(domain.active, d0, 'current domain is d0'); assert.equal(conn.domain, d0, 'connection domain is d0'); @@ -41,7 +42,7 @@ server.listen(common.fakeServerPort, function (err) { }, 200); d0.run(function () { - pool = common.createPool({port: common.fakeServerPort, connectionLimit: 1}); + pool = common.createPool({port: common.fakeServerPort, connectionLimit: 1, testOnBorrowInterval: 0 }); assert.equal(pool.domain, d0, 'pool belongs to d0'); d1.run(function () { diff --git a/test/unit/pool/test-connection-reset.js b/test/unit/pool/test-connection-reset.js index 5e9a4c2d7..f2511d03d 100644 --- a/test/unit/pool/test-connection-reset.js +++ b/test/unit/pool/test-connection-reset.js @@ -1,8 +1,9 @@ var assert = require('assert'); var common = require('../../common'); var pool = common.createPool({ - connectionLimit : 1, - port : common.fakeServerPort + connectionLimit : 1, + port : common.fakeServerPort, + testOnBorrowInterval : 0 }); var server = common.createFakeServer(); diff --git a/test/unit/pool/test-destroy-connection.js b/test/unit/pool/test-destroy-connection.js index 478a70fc0..1f1c5a702 100644 --- a/test/unit/pool/test-destroy-connection.js +++ b/test/unit/pool/test-destroy-connection.js @@ -10,11 +10,11 @@ server.listen(common.fakeServerPort, function (err) { pool.getConnection(function (err, connection) { assert.ifError(err); - assert.strictEqual(connection, pool._allConnections[0]); + assert.strictEqual(connection, pool._manager._allConnection.get(connection.getId())); connection.destroy(); - assert.strictEqual(pool._allConnections.length, 0); - assert.ok(!connection._pool); + assert.equal(pool.getStatus().all, 0); + assert.equal(connection._pool, null); assert.doesNotThrow(function () { connection.release(); }); diff --git a/test/unit/pool/test-end-error.js b/test/unit/pool/test-end-error.js new file mode 100644 index 000000000..0ea73cb04 --- /dev/null +++ b/test/unit/pool/test-end-error.js @@ -0,0 +1,20 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({port: common.fakeServerPort}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function (err) { + assert.ifError(err); + + pool.end(function (err) { + assert.ifError(err); + + pool.getConnection(function(err, connection) { + assert.ok(err); + assert.equal(err.code, 'POOL_CLOSED'); + assert.equal(connection, undefined); + server.destroy(); + }); + }); +}); diff --git a/test/unit/pool/test-evict.js b/test/unit/pool/test-evict.js new file mode 100644 index 000000000..38b21f6dc --- /dev/null +++ b/test/unit/pool/test-evict.js @@ -0,0 +1,32 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + port : common.fakeServerPort, + initialSize : 3, + timeBetweenEvictionRunsMillis : 500, + numTestsPerEvictionRun : 1, + minEvictableIdleTimeMillis : 100 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + pool.on('prepared', function() { + assert.equal(pool.getStatus().idle, 3); + + var expectedIdle = 2; + pool.on('eviction', function(result) { + assert.equal(result.removed, 1); + assert.equal(pool.getStatus().idle, expectedIdle--); + + if (expectedIdle === 0) { + pool.end(function() { + server.destroy(); + }); + } + }); + }); +}); diff --git a/test/unit/pool/test-extra-methods.js b/test/unit/pool/test-extra-methods.js new file mode 100644 index 000000000..fd4c63371 --- /dev/null +++ b/test/unit/pool/test-extra-methods.js @@ -0,0 +1,31 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + testOnBorrow : false +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + pool.getConnection(function(err, conn) { + assert.ifError(err); + + assert.equal(conn.isUsed(), true); + assert.equal(conn.isRemoved(), false); + var lastUsedTime = conn.getLastUsedTime(); + + conn.release(); + assert.equal(conn.isUsed(), false); + + pool.getConnection(function(err, conn) { + assert.ok(conn.getLastUsedTime() !== lastUsedTime); + assert.ok(conn.getReuseCount() === 1); + + server.destroy(); + }); + }); +}); diff --git a/test/unit/pool/test-initial-size-wait.js b/test/unit/pool/test-initial-size-wait.js new file mode 100644 index 000000000..4b8f5fe8d --- /dev/null +++ b/test/unit/pool/test-initial-size-wait.js @@ -0,0 +1,38 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + initialSize : 2 +}); + +var server = common.createFakeServer(); +var connection1time = 0, connection2time = 0; + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.getConnection(function(err, conn){ + connection1time = Date.now(); + assert.ifError(err); + + setTimeout(function() { + conn.release(); + }, 500); + }); + + pool.getConnection(function(err, conn) { + assert.ifError(err); + connection2time = Date.now(); + assert.ok(connection2time - connection1time < 250, 'queued callbacks must be run at the same time'); + conn.release(); + server.destroy(); + }); +}); + +server.on('connection', function(incomingConnection) { + setTimeout(function() { + incomingConnection.handshake(); + }, 300); +}); diff --git a/test/unit/pool/test-initial-size.js b/test/unit/pool/test-initial-size.js new file mode 100644 index 000000000..f4584c740 --- /dev/null +++ b/test/unit/pool/test-initial-size.js @@ -0,0 +1,33 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + initialSize : 2 +}); + +var server = common.createFakeServer(); +var prepared = false; + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.on('prepared', function(preparedConnectionCount) { + prepared = true; + assert.equal(preparedConnectionCount, 2); + }); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + conn.release(); + }); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + assert.equal(prepared, true); + + conn.release(); + server.destroy(); + }); +}); diff --git a/test/unit/pool/test-max-idle-nolimit.js b/test/unit/pool/test-max-idle-nolimit.js new file mode 100644 index 000000000..e43507f6c --- /dev/null +++ b/test/unit/pool/test-max-idle-nolimit.js @@ -0,0 +1,50 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 2, + port : common.fakeServerPort, + maxIdle : 0 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + + setTimeout(function() { + connection.release(); + }, 1000); + }); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + + setTimeout(function() { + connection.release(); + }, 1000); + }); + + setTimeout(function() { + assert.deepEqual(pool.getStatus(), { + all : 2, + use : 2, + idle : 0, + queue : 0 + }); + }, 500); + + setTimeout(function() { + assert.deepEqual(pool.getStatus(), { + all : 2, + use : 0, + idle : 2, + queue : 0 + }); + + server.destroy(); + }, 1500); +}); diff --git a/test/unit/pool/test-max-idle.js b/test/unit/pool/test-max-idle.js new file mode 100644 index 000000000..58899b49b --- /dev/null +++ b/test/unit/pool/test-max-idle.js @@ -0,0 +1,52 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 2, + port : common.fakeServerPort, + maxIdle : 1 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + + setTimeout(function() { + connection.release(); + }, 1000); + }); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + + setTimeout(function() { + connection.release(); + }, 1000); + }); + + setTimeout(function() { + assert.deepEqual(pool.getStatus(), { + all : 2, + use : 2, + idle : 0, + queue : 0 + }); + }, 500); + + setTimeout(function() { + assert.deepEqual(pool.getStatus(), { + all : 1, + use : 0, + idle : 1, + queue : 0 + }); + + pool.end(function() { + server.destroy(); + }); + }, 1500); +}); diff --git a/test/unit/pool/test-max-reuse-count.js b/test/unit/pool/test-max-reuse-count.js new file mode 100644 index 000000000..8ded173fc --- /dev/null +++ b/test/unit/pool/test-max-reuse-count.js @@ -0,0 +1,30 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + maxReuseCount : 1 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + connection.release(); + + assert.equal(pool.getStatus().all, 1); + + pool.getConnection(function (err, connection) { + assert.ifError(err); + connection.release(); + + assert.equal(pool.getStatus().all, 0); + + server.destroy(); + }); + }); +}); diff --git a/test/unit/pool/test-min-idle.js b/test/unit/pool/test-min-idle.js new file mode 100644 index 000000000..152902710 --- /dev/null +++ b/test/unit/pool/test-min-idle.js @@ -0,0 +1,34 @@ +var assert = require('assert'); +var common = require('../../common'); + +var pool = common.createPool({ + connectionLimit : 2, + port : common.fakeServerPort, + minIdle : 2, + timeBetweenEvictionRunsMillis : 1000 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err) { + assert.ifError(err); + + var count = 0; + + pool.on('connection', function() { + if (++count === 2) { + setTimeout(function() { + assert.deepEqual(pool.getStatus(), { + all : 2, + use : 0, + idle : 2, + queue : 0 + }); + + pool.end(function() { + server.destroy(); + }); + }, 500); + } + }); +}); diff --git a/test/unit/pool/test-pool-connection-manager-data.js b/test/unit/pool/test-pool-connection-manager-data.js new file mode 100644 index 000000000..fcb44f0bb --- /dev/null +++ b/test/unit/pool/test-pool-connection-manager-data.js @@ -0,0 +1,108 @@ +var assert = require('assert'); +var common = require('../../common'); +var test = require('utest'); + +var PoolConnectionManagerData = require(common.lib + '/PoolConnectionManagerData'); + +var DummyConnection = function(id, value) { + this.id = id; + this.value = value; +}; + +DummyConnection.prototype.getId = function() { + return this.id; +}; + +test('PoolConfig#AllConnection', { + 'works with basic operations': function() { + var allConnection = new PoolConnectionManagerData.AllConnection(); + + allConnection.add(new DummyConnection(1, 'a')); + allConnection.add(new DummyConnection(2, 'b')); + allConnection.add(new DummyConnection(3, 'c')); + + assert.equal(allConnection.size(), 3); + assert.equal(allConnection.get(2).value, 'b'); + + allConnection.remove(new DummyConnection(2)); + assert.equal(allConnection.size(), 2); + assert.equal(allConnection.get(2), null); + + var ids = []; + allConnection.destroy(function(connection) { + ids.push(connection.getId()); + }); + + assert.deepEqual(ids, [1, 3]); + assert.ok(allConnection.isEmpty()); + } +}); + +test('PoolConfig#IdleConnection', { + 'works with basic operations': function() { + var idleConnection = new PoolConnectionManagerData.IdleConnection(); + idleConnection.add(new DummyConnection(1)); + idleConnection.add(new DummyConnection(2)); + + // stack : 1, 2 + assert.equal(idleConnection.size(), 2); + + // stack : 1 + assert.equal(idleConnection.pop(), 2); + + // stack : 1, 3, 4 + idleConnection.add(new DummyConnection(3)); + idleConnection.add(new DummyConnection(4)); + + var twoItems = idleConnection.lookup(2); + assert.deepEqual(twoItems, [1, 3]); + + // stack : 1, 4 + idleConnection.remove(new DummyConnection(3)); + + assert.equal(idleConnection.size(), 2); + assert.equal(idleConnection.pop(), 4); + assert.equal(idleConnection.pop(), 1); + assert.equal(idleConnection.pop(), null); + assert.ok(idleConnection.isEmpty()); + } +}); + +test('PoolConfig#CallbackQueue', { + 'works with basic operations': function() { + var callbackQueue = new PoolConnectionManagerData.CallbackQueue(); + + function dummyCallback(connection) { + connection(); + } + + function dummyCallback2(connection) { + connection(); + } + + // add + callbackQueue.add(dummyCallback); + callbackQueue.add(dummyCallback2); + + assert.equal(callbackQueue.size(), 2); + + // pop + var callback = callbackQueue.pop(); + assert.equal(callback, dummyCallback); + assert.equal(callbackQueue.size(), 1); + + // rollback + callbackQueue.rollback(callback); + assert.equal(callbackQueue.size(), 2); + assert.deepEqual(callbackQueue._queue[0].callback, dummyCallback); + + // destory + var callbacks = []; + callbackQueue.destroy(function(callback) { + callbacks.push(callback); + }); + + assert.deepEqual(callbacks, [dummyCallback, dummyCallback2]); + assert.ok(callbackQueue.isEmpty()); + } +}); diff --git a/test/unit/pool/test-queue-timeout.js b/test/unit/pool/test-queue-timeout.js new file mode 100644 index 000000000..f04f0b7d9 --- /dev/null +++ b/test/unit/pool/test-queue-timeout.js @@ -0,0 +1,32 @@ +var assert = require('assert'); +var common = require('../../common'); +var pool = common.createPool({ + connectionLimit : 1, + port : common.fakeServerPort, + queueTimeout : 100 +}); + +var server = common.createFakeServer(); + +server.listen(common.fakeServerPort, function(err){ + assert.ifError(err); + + pool.getConnection(function(err, conn){ + assert.ifError(err); + + setTimeout(function() { + conn.release(); + }, 200); + }); + + pool.getConnection(function(err) { + assert.ok(err, 'got error'); + assert.equal(err.code, 'POOL_QUEUETIMEOUT'); + }); + + pool.getConnection(function(err) { + assert.ok(err, 'got error'); + assert.equal(err.code, 'POOL_QUEUETIMEOUT'); + server.destroy(); + }); +}); diff --git a/test/unit/pool/test-release-event.js b/test/unit/pool/test-release-event.js index 3d175ef19..2b4bf7b53 100644 --- a/test/unit/pool/test-release-event.js +++ b/test/unit/pool/test-release-event.js @@ -24,15 +24,7 @@ server.listen(common.fakeServerPort, function (err) { connection.release(); assert.equal(count, 1); assert.equal(threadId, connection.threadId); - }); - pool.getConnection(function (err, connection) { - assert.ifError(err); - assert.ok(connection); - assert.equal(count, 1); - connection.release(); - assert.equal(count, 2); - assert.equal(threadId, connection.threadId); pool.end(function (err) { assert.ifError(err); server.destroy();