diff --git a/Readme.md b/Readme.md index 35ec4ccbd..a54db78fd 100644 --- a/Readme.md +++ b/Readme.md @@ -138,7 +138,7 @@ When establishing a connection, you can set the following options: * `user`: The MySQL user to authenticate as. * `password`: The password of that MySQL user. * `database`: Name of the database to use for this connection (Optional). -* `charset`: The charset for the connection. (Default: `'UTF8_GENERAL_CI'`) +* `charset`: The charset for the connection. (Default: `'UTF8_GENERAL_CI'`. Value needs to be all in upper case letters!) * `timezone`: The timezone used to store local dates. (Default: `'local'`) * `stringifyObjects`: Stringify objects instead of converting to values. See issue [#501](https://github.com/felixge/node-mysql/issues/501). (Default: `'false'`) @@ -252,6 +252,42 @@ up to 100 connections, but only ever use 5 simultaneously, only 5 connections will be made. Connections are also cycled round-robin style, with connections being taken from the top of the pool and returning to the bottom. +If you need to create connections when the pool is started, you can set `initialSize` option. + +```js +var pool = mysql.createPool({ + host : 'example.org', + user : 'bob', + password : 'secret', + initialSize : 10 +}); + +// You can listen to the `initialized` event (emit after all connections are created) +pool.on('initialized', function(poolSize) { +}); + +// Return connection while pool is initializing. (after connection is created) +pool.getConnection(function(err, connection) { +}); +``` + +If you need to manage idle connections, you can set `minIdle`, `maxIdle` option. +The pool will create or remove connections per `checkIdleInterval` (`minIdle` < free connections < `maxIdle`). +If you set `maxWait` option, getConnection() function has a timeout. + +```js +var pool = mysql.createPool({ + host : 'example.org', + user : 'bob', + password : 'secret', + minIdle: 10, + maxIdle: 30, + maxWait: 3000, // throw error if a connection is not returned within 3 seconds. + checkIdleInterval: 600000, // check per 10 minutes (default value) + checkIdleNumPerRun: 3 // 3 connections are created or removed per 10 minutes (default value) +}); +``` + ## Pool options Pools accept all the same options as a connection. When creating a new @@ -269,6 +305,74 @@ addition to those options pools accept a few extras: * `queueLimit`: The maximum number of connection requests the pool will queue before returning an error from `getConnection`. If set to `0`, there is no limit to the number of queued connection requests. (Default: `0`) +* `maxWait`: The maximum number of milliseconds that the pool will wait for a + connection to be returned. (Default: `0`) +* `initialSize`: The initial number of connections that are created when the + pool is started. (Default: `0`) +* `minIdle`: The minimum number of connections that can remain idle in the pool. + (Default: `0`) +* `maxIdle`: The maximum number of connections that can remain idle in the pool. + (Default: `0`) +* `checkIdleInterval`: The number of milliseconds to check idle in the pool. + (Default: `600000`) +* `checkIdleNumPerRun`: The number of objects to examine(create/remove) + during each run of the idle object check. (Default: `3`) + +## PoolCluster + +PoolCluster provides multiple hosts connection. (group & retry & selector) + +```js +// create +var poolCluster = mysql.createPoolCluster(); + +poolCluster.add(config); // anonymous group +poolCluster.add('MASTER', masterConfig); +poolCluster.add('SLAVE1', slave1Config); +poolCluster.add('SLAVE2', slave2Config); + +// Target Group : ALL(anonymous, MASTER, SLAVE1-2), Selector : round-robin(default) +poolCluster.getConnection(function (err, connection) {}); + +// Target Group : MASTER, Selector : round-robin +poolCluster.getConnection('MASTER', function (err, connection) {}); + +// Target Group : SLAVE1-2, Selector : order +// If can't connect to SLAVE1, return SLAVE2. (remove SLAVE1 in the cluster) +poolCluster.on('remove', function (nodeId) { + console.log('REMOVED NODE : ' + nodeId); // nodeId = SLAVE1 +}); + +poolCluster.getConnection('SLAVE*', 'ORDER', function (err, connection) {}); + +// of namespace : of(pattern, selector) +poolCluster.of('*').getConnection(function (err, connection) {}); + +var pool = poolCluster.of('SLAVE*', 'RANDOM') +pool.getConnection(function (err, connection) {}); +pool.getConnection(function (err, connection) {}); + +// destroy +poolCluster.end(); +``` + +## PoolCluster Option +* `canRetry`: If `true`, `PoolCluster` will attempt to reconnect when connection fails. (Default: `true`) +* `removeNodeErrorCount`: If connection fails, node's `errorCount` increases. + When `errorCount` is greater than `removeNodeErrorCount`, remove a node in the `PoolCluster`. (Default: `5`) +* `defaultSelector`: The default selector. (Default: `RR`) + * `RR`: Select one alternately. (Round-Robin) + * `RANDOM`: Select the node by random function. + * `ORDER`: Select the first node available unconditionally. + +```js +var clusterConfig = { + removeNodeErrorCount: 1, // Remove the node immediately when connection fails. + defaultSelector: 'ORDER' +}; + +var poolCluster = mysql.createPoolCluster(clusterConfig); +``` ## Switching users / altering connection state @@ -880,6 +984,14 @@ For example, if you have an installation of mysql running on localhost:3306 and MYSQL_HOST=localhost MYSQL_PORT=3306 MYSQL_DATABASE=node_mysql_test MYSQL_USER=root MYSQL_PASSWORD= make test ``` +## Running unit tests on windows + +* Edit the variables in the file ```make.bat``` according to your system and mysql-settings. +* Make sure the database (e.g. 'test') you want to use exists and the user you entered has the proper rights to use the test database. (E.g. do not forget to execute the SQL-command ```FLUSH PRIVILEGES``` after you have created the user.) +* In a DOS-box (or CMD-shell) in the folder of your application run ```npm install mysql --dev``` or in the mysql folder (```node_modules\mysql```), run ```npm install --dev```. (This will install additional developer-dependencies for node-mysql.) +* Run ```npm test mysql``` in your applications folder or ```npm test``` in the mysql subfolder. +* If you want to log the output into a file use ```npm test mysql > test.log``` or ```npm test > test.log```. + ## Todo * Prepared statements diff --git a/index.js b/index.js index f589e7f45..8f41182ae 100644 --- a/index.js +++ b/index.js @@ -4,6 +4,7 @@ var Types = require('./lib/protocol/constants/types'); var SqlString = require('./lib/protocol/SqlString'); var Pool = require('./lib/Pool'); var PoolConfig = require('./lib/PoolConfig'); +var PoolCluster = require('./lib/PoolCluster'); exports.createConnection = function(config) { return new Connection({config: new ConnectionConfig(config)}); @@ -13,6 +14,10 @@ exports.createPool = function(config) { return new Pool({config: new PoolConfig(config)}); }; +exports.createPoolCluster = function(config) { + return new PoolCluster(config); +}; + exports.createQuery = Connection.createQuery; exports.Types = Types; diff --git a/lib/Connection.js b/lib/Connection.js index 94677d8e3..1799465e0 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -105,7 +105,6 @@ Connection.prototype.query = function(sql, values, cb) { } query.sql = this.format(query.sql, query.values || []); - delete query.values; return this._protocol._enqueue(query); }; @@ -153,6 +152,10 @@ Connection.prototype.format = function(sql, values) { return SqlString.format(sql, values, this.config.stringifyObjects, this.config.timezone); }; +Connection.prototype.reset = function() { + this._protocol.reset(); +}; + Connection.prototype._handleNetworkError = function(err) { this._protocol.handleNetworkError(err); }; diff --git a/lib/ConnectionConfig.js b/lib/ConnectionConfig.js index fdacfea8a..24ebf7a4c 100644 --- a/lib/ConnectionConfig.js +++ b/lib/ConnectionConfig.js @@ -6,6 +6,8 @@ module.exports = ConnectionConfig; function ConnectionConfig(options) { if (typeof options === 'string') { options = ConnectionConfig.parseUrl(options); + } else { + options = options || {}; } this.host = options.host || 'localhost'; @@ -23,16 +25,16 @@ function ConnectionConfig(options) { this.flags = options.flags || ''; this.queryFormat = options.queryFormat; this.pool = options.pool || undefined; - this.multipleStatements = options.multipleStatements || false; + this.multipleStatements = options.multipleStatements || false; this.typeCast = (options.typeCast === undefined) ? true : options.typeCast; - if (this.timezone[0] == " ") { + if (this.timezone[0] === ' ') { // "+" is a url encoded char for space so it // gets translated to space when giving a // connection string.. - this.timezone = "+" + this.timezone.substr(1); + this.timezone = '+' + this.timezone.substr(1); } this.maxPacketSize = 0; @@ -40,8 +42,7 @@ function ConnectionConfig(options) { ? ConnectionConfig.getCharsetNumber(options.charset) : options.charsetNumber||Charsets.UTF8_GENERAL_CI; - this.clientFlags = ConnectionConfig.mergeFlags(ConnectionConfig.getDefaultFlags(options), - options.flags || ''); + this.clientFlags = ConnectionConfig.mergeFlags(ConnectionConfig.getDefaultFlags(options), this.flags); } ConnectionConfig.mergeFlags = function(default_flags, user_flags) { @@ -89,7 +90,7 @@ ConnectionConfig.parseUrl = function(url) { var options = { host : url.hostname, port : url.port, - database : url.pathname.substr(1), + database : url.pathname.substr(1) }; if (url.auth) { diff --git a/lib/Pool.js b/lib/Pool.js index 9fca0db06..d95e01ace 100644 --- a/lib/Pool.js +++ b/lib/Pool.js @@ -2,6 +2,8 @@ var mysql = require('../'); var Connection = require('./Connection'); var EventEmitter = require('events').EventEmitter; var Util = require('util'); +var PoolManager = require('./PoolExtension').PoolManager; +var PoolConnection = require('./PoolExtension').PoolConnection; module.exports = Pool; @@ -9,84 +11,105 @@ Util.inherits(Pool, EventEmitter); function Pool(options) { EventEmitter.call(this); this.config = options.config; - this.config.connectionConfig.pool = this; - this._allConnections = []; - this._freeConnections = []; - this._connectionQueue = []; + this._allConnections = new PoolConnection.All(); + this._freeConnections = new PoolConnection.Free(); + this._connectionQueue = new PoolConnection.Queue(this); + this._poolManager = new PoolManager(this); this._closed = false; + + this._initialize(); } Pool.prototype.getConnection = function (cb) { if (this._closed) { - return cb(new Error('Pool is closed.')); + return process.nextTick(this._raiseErrorToCB.bind(this, cb, 'Pool is closed.')); } - var connection; - - if (this._freeConnections.length > 0) { - connection = this._freeConnections.shift(); + if (this._poolManager.isNotStarted()) { + this._connectionQueue.add(cb); + return; + } - return cb(null, connection); + var connection = this._freeConnections.get(); + if (connection) { + return process.nextTick(cb.bind(null, null, connection)); } - if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) { + if (this.config.connectionLimit === 0 || this._allConnections.length() < this.config.connectionLimit) { connection = this._createConnection(); - this._allConnections.push(connection); + this._allConnections.incrLength(); - return connection.connect(function(err) { + return this._poolManager.connect(connection, function(err) { if (this._closed) { - return cb(new Error('Pool is closed.')); + return this._raiseErrorToCB(cb, 'Pool is closed.'); } if (err) { - return cb(err); + this._allConnections.decrLength(); + return this._raiseErrorToCB(cb, err); } + this._allConnections.add(connection); + this.emit('connection', connection); + return cb(null, connection); }.bind(this)); } if (!this.config.waitForConnections) { - return cb(new Error('No connections available.')); + return process.nextTick(this._raiseErrorToCB.bind(this, cb, 'No connections available.')); } - if (this.config.queueLimit && this._connectionQueue.length >= this.config.queueLimit) { - return cb(new Error('Queue limit reached.')); + if (this.config.queueLimit && this._connectionQueue.length() >= this.config.queueLimit) { + return this._raiseErrorToCB(cb, 'Queue limit reached.'); } - this._connectionQueue.push(cb); + this._connectionQueue.add(cb); }; Pool.prototype.releaseConnection = function (connection) { var cb; + connection.reset(); + if (connection._poolRemoved) { // The connection has been removed from the pool and is no longer good. - if (this._connectionQueue.length) { - cb = this._connectionQueue.shift(); - + cb = this._connectionQueue.get(); + if (cb) { process.nextTick(this.getConnection.bind(this, cb)); } - } else if (this._connectionQueue.length) { - cb = this._connectionQueue.shift(); - + } else if ((cb = this._connectionQueue.get())) { process.nextTick(cb.bind(null, null, connection)); } else { - this._freeConnections.push(connection); + this._freeConnections.add(connection); } }; Pool.prototype.end = function (cb) { + if (this._poolManager.isNotStarted()) { + return this._poolManager.afterStarted(this.end.bind(this, cb)); + } + + if (this._closed) { + return this._raiseErrorToCB(cb, 'Pool is closing'); + } + this._closed = true; - if (typeof cb != "function") { - cb = function (err) { + this._poolManager.end(); + + if (typeof cb !== 'function') { + cb = function(err) { if (err) throw err; }; } + if (this._allConnections.length() === 0) { + return cb(null); + } + var calledBack = false; var closedConnections = 0; var connection; @@ -96,24 +119,17 @@ Pool.prototype.end = function (cb) { return; } - if (err || ++closedConnections >= this._allConnections.length) { + if (err || ++closedConnections >= this._allConnections.length()) { calledBack = true; - delete endCB; return cb(err); } }.bind(this); - if (this._allConnections.length === 0) { - return endCB(); - } - - for (var i = 0; i < this._allConnections.length; i++) { - connection = this._allConnections[i]; - + this._allConnections.each(function(connection) { connection.destroy = connection._realDestroy; connection.end = connection._realEnd; connection.end(endCB); - } + }); }; Pool.prototype.query = function (sql, values, cb) { @@ -132,6 +148,10 @@ Pool.prototype.query = function (sql, values, cb) { }); }; +Pool.prototype.getFreeSize = function () { + return this._freeConnections.length(); +}; + Pool.prototype._createConnection = function() { var self = this; var connection = (this.config.createConnection) @@ -156,9 +176,27 @@ Pool.prototype._createConnection = function() { connection.on('end', this._handleConnectionEnd.bind(this, connection)); connection.on('error', this._handleConnectionError.bind(this, connection)); + connection._id = this._allConnections.getId(); + return connection; }; +Pool.prototype._removeConnection = function(connection, isFromIdleCheck) { + isFromIdleCheck = isFromIdleCheck || false; + + connection._poolRemoved = true; + connection.end = connection._realEnd; + connection.destroy = connection._realDestroy; + + this._allConnections.remove(connection); + + if (isFromIdleCheck) { + connection.end(); + } else { + this._freeConnections.remove(connection); + } +}; + Pool.prototype._handleConnectionEnd = function(connection) { if (this._closed || connection._poolRemoved) { return; @@ -173,27 +211,17 @@ Pool.prototype._handleConnectionError = function(connection) { this._removeConnection(connection); }; -Pool.prototype._removeConnection = function(connection) { - var i; - - connection._poolRemoved = true; - - for (i = 0; i < this._allConnections.length; i++) { - if (this._allConnections[i] === connection) { - this._allConnections.splice(i, 1); - break; - } - } - - for (i = 0; i < this._freeConnections.length; i++) { - if (this._freeConnections[i] === connection) { - this._freeConnections.splice(i, 1); - break; - } +Pool.prototype._raiseErrorToCB = function(cb, message) { + if (typeof cb === 'function' && cb._PE_CALLED === undefined) { + cb._PE_CALLED = true; + cb(typeof message === 'string' ? new Error(message) : message); } +}; - connection.end = connection._realEnd; - connection.destroy = connection._realDestroy; +Pool.prototype._initialize = function() { + this._poolManager.afterStarted(function() { + this._connectionQueue.each(this.getConnection.bind(this)); + }.bind(this)); - this.releaseConnection(connection); + this._poolManager.start(); }; diff --git a/lib/PoolCluster.js b/lib/PoolCluster.js new file mode 100644 index 000000000..f7a9e3085 --- /dev/null +++ b/lib/PoolCluster.js @@ -0,0 +1,241 @@ +var Pool = require('./Pool'); +var PoolConfig = require('./PoolConfig'); +var Util = require('util'); +var EventEmitter = require('events').EventEmitter; + +module.exports = PoolCluster; + +/** + * PoolCluster + */ +function PoolCluster(config) { + EventEmitter.call(this); + + config = config || {}; + this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry; + this._removeNodeErrorCount = config.removeNodeErrorCount || 5; + this._defaultSelector = config.defaultSelector || 'RR'; + + this._closed = false; + this._lastId = 0; + this._nodes = {}; + this._serviceableNodeIds = []; + this._namespaces = {}; + this._findCaches = {}; +} + +Util.inherits(PoolCluster, EventEmitter); + +PoolCluster.prototype.of = function(pattern, selector) { + pattern = pattern || '*'; + + selector = selector || this._defaultSelector; + selector = selector.toUpperCase(); + if (typeof Selector[selector] === 'undefined') { + selector = this._defaultSelector; + } + + var key = pattern + selector; + + if (typeof this._namespaces[key] === 'undefined') { + this._namespaces[key] = new PoolNamespace(this, pattern, selector); + } + + return this._namespaces[key]; +}; + +PoolCluster.prototype.add = function(id, config) { + if (typeof id === 'object') { + config = id; + id = 'CLUSTER::' + (++this._lastId); + } + + if (typeof this._nodes[id] === 'undefined') { + this._nodes[id] = { + id: id, + errorCount: 0, + pool: new Pool({config: new PoolConfig(config)}) + }; + + this._serviceableNodeIds.push(id); + + this._clearFindCaches(); + } +}; + +PoolCluster.prototype.getConnection = function(pattern, selector, cb) { + if (typeof pattern === 'function') { + cb = pattern; + namespace = this.of(); + } else { + if (typeof selector === 'function') { + cb = selector; + selector = this._defaultSelector; + } + + namespace = this.of(pattern, selector); + } + + namespace.getConnection(cb); +}; + +PoolCluster.prototype.end = function() { + if (this._closed) { + return; + } + + this._closed = true; + + for (var id in this._nodes) { + this._nodes[id].pool.end(); + } +}; + +PoolCluster.prototype._findNodeIds = function(pattern) { + if (typeof this._findCaches[pattern] !== 'undefined') { + return this._findCaches[pattern]; + } + + var foundNodeIds; + + if (pattern === '*') { // all + foundNodeIds = this._serviceableNodeIds; + } else if (typeof this._serviceableNodeIds[pattern] !== 'undefined') { // one + foundNodeIds = [pattern]; + } else { // wild matching + var keyword = pattern.substring(pattern.length - 1, 0); + + foundNodeIds = this._serviceableNodeIds.filter(function (id) { + return id.indexOf(keyword) === 0; + }); + } + + this._findCaches[pattern] = foundNodeIds; + + return foundNodeIds; +}; + +PoolCluster.prototype._getNode = function(id) { + return this._nodes[id] || null; +}; + +PoolCluster.prototype._increaseErrorCount = function(node) { + if (++node.errorCount >= this._removeNodeErrorCount) { + var index = this._serviceableNodeIds.indexOf(node.id); + if (index !== -1) { + this._serviceableNodeIds.splice(index, 1); + delete this._nodes[node.id]; + + this._clearFindCaches(); + + node.pool.end(); + + this.emit('remove', node.id); + } + } +}; + +PoolCluster.prototype._decreaseErrorCount = function(node) { + if (node.errorCount > 0) { + --node.errorCount; + } +}; + +PoolCluster.prototype._getConnection = function(node, cb) { + var self = this; + + node.pool.getConnection(function (err, connection) { + if (err) { + self._increaseErrorCount(node); + + if (self._canRetry) { + console.warn('[Error] PoolCluster : ' + err); + return cb(null, 'retry'); + } else { + return cb(err); + } + } else { + self._decreaseErrorCount(node); + } + + connection._clusterId = node.id; + + cb(null, connection); + }); +}; + +PoolCluster.prototype._clearFindCaches = function() { + this._findCaches = {}; +}; + +/** + * PoolNamespace + */ +function PoolNamespace(cluster, pattern, selector) { + this._cluster = cluster; + this._pattern = pattern; + this._selector = new Selector[selector](); +} + +PoolNamespace.prototype.getConnection = function(cb) { + var clusterNode = this._getClusterNode(); + + if (clusterNode === null) { + return cb(new Error('Pool does Not exists.')); + } + + this._cluster._getConnection(clusterNode, function(err, connection) { + if (err) { + return cb(err); + } + + if (connection === 'retry') { + return this.getConnection(cb); + } + + cb(null, connection); + }.bind(this)); +}; + +PoolNamespace.prototype._getClusterNode = function() { + var foundNodeIds = this._cluster._findNodeIds(this._pattern); + + if (foundNodeIds.length === 0) { + return null; + } + + var nodeId = (foundNodeIds.length === 1) ? foundNodeIds[0] : this._selector(foundNodeIds); + + return this._cluster._getNode(nodeId); +}; + +/** + * Selector + */ +var Selector = {}; + +Selector.RR = function () { + var index = 0; + + return function(clusterIds) { + if (index >= clusterIds.length) { + index = 0; + } + + var clusterId = clusterIds[index++]; + + return clusterId; + }; +}; + +Selector.RANDOM = function () { + return function(clusterIds) { + return clusterIds[Math.floor(Math.random() * clusterIds.length)]; + }; +}; + +Selector.ORDER = function () { + return function(clusterIds) { + return clusterIds[0]; + }; +}; diff --git a/lib/PoolConfig.js b/lib/PoolConfig.js index 455c8ee17..d97a0d1f3 100644 --- a/lib/PoolConfig.js +++ b/lib/PoolConfig.js @@ -14,4 +14,22 @@ function PoolConfig(options) { this.queueLimit = (options.queueLimit === undefined) ? 0 : Number(options.queueLimit); + this.maxWait = (options.maxWait === undefined) + ? 0 + : Number(options.maxWait); + this.initialSize = (options.initialSize === undefined) + ? 0 + : Number(options.initialSize); + this.minIdle = (options.minIdle === undefined) + ? 0 + : Number(options.minIdle); + this.maxIdle = (options.maxIdle === undefined) + ? 0 + : Number(options.maxIdle); + this.idleCheckInterval = (options.idleCheckInterval === undefined) + ? 600000 // 10 minutes + : Number(options.idleCheckInterval); + this.idleCheckNumPerRun = (options.idleCheckNumPerRun === undefined) + ? 3 + : Number(options.idleCheckNumPerRun); } diff --git a/lib/PoolExtension.js b/lib/PoolExtension.js new file mode 100644 index 000000000..ca349146f --- /dev/null +++ b/lib/PoolExtension.js @@ -0,0 +1,339 @@ +var EventEmitter = require('events').EventEmitter; +var Util = require('util'); + +// ============================================================================ +// PoolManager +// ============================================================================ + +exports.PoolManager = PoolManager; + +function PoolManager(pool) { + EventEmitter.call(this); + + this.setMaxListeners(0); + + this._pool = pool; + this._started = false; + this._checkIdleHandle = null; + this._timeoutTimer = {}; + + this._generateConfigVariables(); +} + +Util.inherits(PoolManager, EventEmitter); + +PoolManager.prototype.start = function() { + this._createInitialConnections(); +}; + +PoolManager.prototype.end = function() { + if (this._checkIdleHandle !== null) { + clearInterval(this._checkIdleHandle); + this._checkIdleHandle = null; + } +}; + +PoolManager.prototype.isNotStarted = function() { + return this._started !== true; +}; + +PoolManager.prototype.afterStarted = function(cb) { + this.on('started', cb); +}; + +PoolManager.prototype.connect = function (connection, cb) { + if (this._maxWait <= 0) { + return connection.connect(cb.bind(null)); + } + + this._timeoutTimer[connection._id] = setTimeout(function() { + connection.destroy(); + cb(new Error('Timed out (DB Connection)')); + }, this._maxWait); + + connection.connect(function(err) { + clearTimeout(this._timeoutTimer[connection._id]); + cb(err); + }.bind(this)); +}; + +PoolManager.prototype._generateConfigVariables = function() { + var config = this._pool.config; + + this._initialSize = config.initialSize; + this._minIdle = config.minIdle; + this._maxIdle = config.maxIdle; + this._idleCheckInterval = config.idleCheckInterval; + this._idleCheckNumPerRun = config.idleCheckNumPerRun; + this._maxWait = config.maxWait; +}; + +PoolManager.prototype._createInitialConnections = function() { + if (this._initialSize <= 0) { + return this._endInitialConnections(0); + } + + var attemptedConnect = 0; + var attachedConnect = 0; + + this._executeFunction(function () { + var connection = this._pool._createConnection(); + + this.connect(connection, function(err) { + if (!err) { + attachedConnect++; + this._attachConnection(connection); + } + + if (++attemptedConnect === this._initialSize) { + this._endInitialConnections(attachedConnect); + } + }.bind(this)); + }).times(this._initialSize); +}; + +PoolManager.prototype._endInitialConnections = function(poolSize) { + this._started = true; + + process.nextTick(this._pool.emit.bind(this._pool, 'initialized', poolSize)); + + this._startIdleAction(); + + this.emit('started'); +}; + +PoolManager.prototype._startIdleAction = function() { + this._maxIdle = Math.max(this._maxIdle, this._minIdle); + + if (this._minIdle > 0 || this._maxIdle > 0) { + this._checkIdleHandle = setInterval(this._checkIdle.bind(this), this._idleCheckInterval); + } +}; + +PoolManager.prototype._checkIdle = function() { + // check minIdle + if (this._minIdle > 0) { + var needfulCreateConnection = this._getNeedCount(this._minIdle, this._pool.getFreeSize()); + if (needfulCreateConnection > 0) { + this._executeFunction(function() { + var connection = this._pool._createConnection(); + + connection.connect(function (err) { + if (!err) { + this._attachConnection(connection); + } + }.bind(this)); + }).times(needfulCreateConnection); + + return; + } + } + + // check maxIdle + if (this._maxIdle > 0) { + var needfulRemoveConnection = this._getNeedCount(this._pool.getFreeSize(), this._maxIdle); + if (needfulRemoveConnection > 0) { + this._executeFunction(this._detachAnyConnection.bind(this)).times(needfulRemoveConnection); + } + } +}; + +PoolManager.prototype._getNeedCount = function(targetA, targetB) { + var count = targetA - targetB; + if (count <= 0 || this._idleCheckNumPerRun === 0) { + return count; + } + + return Math.min(count, this._idleCheckNumPerRun); +}; + +PoolManager.prototype._executeFunction = function (fn) { + return { + _scope: this, + _fn: fn, + times: function(num) { + if (typeof this._fn !== 'function') { + return; + } + + for (var i = 0; i < num; i++) { + process.nextTick(this._fn.bind(this._scope)); + } + } + }; +}; + +PoolManager.prototype._attachConnection = function (connection) { + var pool = this._pool; + + pool._allConnections.add(connection, true); + + pool.emit('connection', connection); + + var cb = pool._connectionQueue.get(); + if (cb) { + process.nextTick(cb.bind(null, null, connection)); + } else { + pool._freeConnections.add(connection); + } +}; + +PoolManager.prototype._detachAnyConnection = function () { + var pool = this._pool; + + var connection = pool._freeConnections.get(); + if (connection) { + pool._removeConnection(connection, true); + } +}; + +// ============================================================================ +// PoolConnection +// ============================================================================ + +var PoolConnection = exports.PoolConnection = {}; + +// All Connections +PoolConnection.All = function() { + this._length = 0; + this._lastId = 0; + this._storage = {}; +}; + +PoolConnection.All.prototype.getId = function() { + return ++this._lastId; +}; + +PoolConnection.All.prototype.length = function() { + return this._length; +}; + +PoolConnection.All.prototype.add = function(connection, incr) { + this._storage[connection._id] = connection; + + if (incr === true) { + this.incrLength(); + } +}; + +PoolConnection.All.prototype.remove = function(connection) { + delete this._storage[connection._id]; + this.decrLength(); +}; + +PoolConnection.All.prototype.each = function(fn) { + for (var key in this._storage) { + fn(this._storage[key]); + } +}; + +PoolConnection.All.prototype.incrLength = function() { + this._length++; +}; + +PoolConnection.All.prototype.decrLength = function() { + this._length--; +}; + +// Free Connections +PoolConnection.Free = function() { + this._storage = []; +}; + +PoolConnection.Free.prototype.length = function() { + return this._storage.length; +}; + +PoolConnection.Free.prototype.get = function() { + return (this._storage.length <= 0) ? null : this._storage.shift(); +}; + +PoolConnection.Free.prototype.add = function(connection) { + this._storage.push(connection); +}; + +PoolConnection.Free.prototype.remove = function(connection) { + for (var i = 0, len = this._storage.length; i < len; i++) { + if (this._storage[i]._id === connection._id) { + this._storage.splice(i, 1); + break; + } + } +}; + +// Client Queue +PoolConnection.Queue = function(pool) { + this._lastId = 0; + this._storage = []; + this._maxWait = pool.config.maxWait; + this._timeoutTimer = {}; +}; + +PoolConnection.Queue.prototype.length = function() { + return this._storage.length; +}; + +PoolConnection.Queue.prototype.add = function(cb) { + var id = this._add(cb); + + if (this._maxWait > 0) { + this._timeoutTimer[id] = setTimeout(this._timeout.bind(this, id), this._maxWait); + } +}; + +PoolConnection.Queue.prototype.get = function() { + var obj = this._get(); + if (!obj) { + return null; + } + + if (this._maxWait > 0) { + clearTimeout(this._timeoutTimer[obj._id]); + } + + return obj._cb; +}; + +PoolConnection.Queue.prototype.each = function(fn) { + this._storage.forEach(function(obj) { + obj._cb._PE_CALLED = true; + process.nextTick(fn.bind(null, obj._cb)); + }); +}; + +PoolConnection.Queue.prototype._timeout = function(id) { + var cb = this._remove(id); + if (cb && cb._PE_CALLED === undefined) { + cb._PE_CALLED = true; + cb(new Error('Timed out (getConnection)')); + } +}; + +PoolConnection.Queue.prototype._add = function(cb) { + var obj = { + _id: ++this._lastId, + _cb: cb + }; + + this._storage.push(obj); + + return obj._id; +}; + +PoolConnection.Queue.prototype._get = function() { + return (this._storage.length <= 0) ? null : this._storage.shift(); +}; + +PoolConnection.Queue.prototype._remove = function(id) { + var cb = null; + + for (var i = 0, len = this._storage.length; i < len; i++) { + if (this._storage[i]._id === id) { + cb = this._storage[i]._cb; + this._storage.splice(i, 1); + break; + } + } + + return cb; +}; diff --git a/lib/protocol/Protocol.js b/lib/protocol/Protocol.js index a6c273132..a7f3482c1 100644 --- a/lib/protocol/Protocol.js +++ b/lib/protocol/Protocol.js @@ -307,3 +307,8 @@ Protocol.prototype._debugPacket = function(incoming, packet) { console.log(packet); console.log(''); }; + +Protocol.prototype.reset = function() { + this._queue.length = 0; +}; + diff --git a/lib/protocol/packets/FieldPacket.js b/lib/protocol/packets/FieldPacket.js index d8dbebbc4..dcbf8648a 100644 --- a/lib/protocol/packets/FieldPacket.js +++ b/lib/protocol/packets/FieldPacket.js @@ -30,7 +30,7 @@ FieldPacket.prototype.parse = function(parser) { this.orgName = parser.parseLengthCodedString(); this.filler1 = parser.parseFiller(1); this.charsetNr = parser.parseUnsignedNumber(2); - this.fieldLength = parser.parseUnsignedNumber(4); + this.length = parser.parseUnsignedNumber(4); this.type = parser.parseUnsignedNumber(1); this.flags = parser.parseUnsignedNumber(2); this.decimals = parser.parseUnsignedNumber(1); @@ -47,7 +47,7 @@ FieldPacket.prototype.parse = function(parser) { } else { this.table = parser.parseLengthCodedString(); this.name = parser.parseLengthCodedString(); - this.fieldLength = parser.parseUnsignedNumber(parser.parseUnsignedNumber(1)); + this.length = parser.parseUnsignedNumber(parser.parseUnsignedNumber(1)); this.type = parser.parseUnsignedNumber(parser.parseUnsignedNumber(1)); } }; @@ -62,7 +62,7 @@ FieldPacket.prototype.write = function(writer) { writer.writeLengthCodedString(this.orgName); writer.writeFiller(1); writer.writeUnsignedNumber(2, this.charsetNr || 0); - writer.writeUnsignedNumber(4, this.fieldLength || 0); + writer.writeUnsignedNumber(4, this.length || 0); writer.writeUnsignedNumber(1, this.type || 0); writer.writeUnsignedNumber(2, this.flags || 0); writer.writeUnsignedNumber(1, this.decimals || 0); @@ -75,7 +75,7 @@ FieldPacket.prototype.write = function(writer) { writer.writeLengthCodedString(this.table); writer.writeLengthCodedString(this.name); writer.writeUnsignedNumber(1, 0x01); - writer.writeUnsignedNumber(1, this.fieldLength); + writer.writeUnsignedNumber(1, this.length); writer.writeUnsignedNumber(1, 0x01); writer.writeUnsignedNumber(1, this.type); } diff --git a/make.bat b/make.bat new file mode 100644 index 000000000..8aa913ca2 --- /dev/null +++ b/make.bat @@ -0,0 +1,29 @@ +@ECHO OFF +REM Simple 'make' replacement for windows-based system, 'npm test' will now find +REM 'make.bat' and sets the MYSQL_ enviroment variables according to this file + +REM Edit the variables according to your system +REM No spaces (' ') between variablenames, '=' and the values! + +REM Host to test with default: localhost +SET MYSQL_HOST=localhost + +REM Default mysql port: 3306 +SET MYSQL_PORT=3306 + +REM For a standard installatons of Wampp using http://www.apachefriends.org/ +REM the default login is 'root' and no password, but any user should do, change +REM the settings according to your installation +SET MYSQL_USER=root +SET MYSQL_PASSWORD= + +REM make sure the database you are using exists and the above user has access to +REM it. Normally every user has access to the database 'test' +SET MYSQL_DATABASE=test + +@ECHO ON +node test/run.js +@ECHO OFF +ECHO ********************** +ECHO If the tests should fail, make sure you have SET the correct values for the enviroment variables in the file 'make.bat' +ECHO ********************** \ No newline at end of file diff --git a/test/common.js b/test/common.js index 4984228da..fda4dd091 100644 --- a/test/common.js +++ b/test/common.js @@ -35,6 +35,12 @@ common.createPool = function(config) { return Mysql.createPool(config); }; +common.createPoolCluster = function(config) { + config = mergeTestConfig(config); + config.createConnection = common.createConnection; + return Mysql.createPoolCluster(config); +}; + common.createFakeServer = function(options) { return new FakeServer(_.extend({}, options)); }; @@ -45,7 +51,11 @@ common.useTestDb = function(connection) { }); connection.query('USE ' + common.testDatabase); -} +}; + +common.getTestConfig = function(config) { + return mergeTestConfig(config); +}; function mergeTestConfig(config) { if (common.isTravis()) { diff --git a/test/integration/connection/test-unix-domain-socket.js b/test/integration/connection/test-unix-domain-socket.js index 229395ece..0a63aa228 100644 --- a/test/integration/connection/test-unix-domain-socket.js +++ b/test/integration/connection/test-unix-domain-socket.js @@ -1,9 +1,19 @@ +/** + * This test is skipped, if the environment variable "windir" is set. + * It assumes that it runs on a windows system then. + */ +if (process.env.windir) { + return console.log('Skipping "test-unix-domain-socket.js" - Environment' + + ' variable "windir" is set. Skipping this, because we seem to be on' + + ' a windows system'); +} var common = require('../../common'); var connection = common.createConnection({socketPath: common.fakeServerSocket}); var assert = require('assert'); var server = common.createFakeServer(); var didConnect = false; + server.listen(common.fakeServerSocket, function(err) { if (err) throw err; @@ -27,6 +37,6 @@ server.on('connection', function(connection) { }); process.on('exit', function() { - assert.equal(didConnect, true); - assert.equal(hadConnection, true); + assert.equal(didConnect, true); + assert.equal(hadConnection, true); }); diff --git a/test/integration/pool/test-check-idle.js b/test/integration/pool/test-check-idle.js new file mode 100644 index 000000000..cb6789598 --- /dev/null +++ b/test/integration/pool/test-check-idle.js @@ -0,0 +1,58 @@ +var common = require('../../common'); +var assert = require('assert'); +var Connection = require(common.lib + '/Connection'); + +function testMinIdle() { + var config = common.getTestConfig({ + minIdle: 6, + idleCheckInterval: 2000, + idleCheckNumPerRun: 2 + }); + + var pool = common.createPool(config); + + pool.on('initialized', function() { + for (var i = 1; i <= 4; i++) { + (function(idx) { + setTimeout(function() { + var expectedPoolSize = config.idleCheckNumPerRun * (idx -1); + assert.equal(pool.getFreeSize(), expectedPoolSize); + + if (idx === 4) { + pool.end(); + testMaxIdle(); + } + }, (idx * 2000) - 1000); + })(i); + } + }); +} + +function testMaxIdle() { + var config = common.getTestConfig({ + initialSize: 10, + maxIdle: 4, + idleCheckInterval: 1000, + idleCheckNumPerRun: 2 + }); + + var pool = common.createPool(config); + + pool.on('initialized', function() { + for (var i = 1; i <= 4; i++) { + (function(idx) { + setTimeout(function() { + var expectedPoolSize = config.initialSize - config.idleCheckNumPerRun * (idx -1); + + assert.equal(pool.getFreeSize(), expectedPoolSize); + + if (idx === 4) { + pool.end(); + } + }, (idx * 1000) - 500); + })(i); + } + }); +} + +testMinIdle(); diff --git a/test/integration/pool/test-cluster.js b/test/integration/pool/test-cluster.js new file mode 100644 index 000000000..6d1975a09 --- /dev/null +++ b/test/integration/pool/test-cluster.js @@ -0,0 +1,233 @@ +var common = require('../../common'); +var assert = require('assert'); + +function createPoolCluster(clusterConfig, poolConfig) { + var cluster = common.createPoolCluster(clusterConfig); + + if (typeof poolConfig === 'undefined') { + poolConfig = common.getTestConfig(); + } + + cluster.add(poolConfig); + cluster.add('MASTER', poolConfig); + cluster.add('SLAVE1', poolConfig); + cluster.add('SLAVE2', poolConfig); + + return cluster; +} + +// Test_base_function +(function () { + var cluster = createPoolCluster(); + + // added nodes + assert.deepEqual(cluster._serviceableNodeIds, ['CLUSTER::1', 'MASTER', 'SLAVE1', 'SLAVE2']); + + // _findNodeIds + assert.deepEqual(cluster._findNodeIds('MASTER'), ['MASTER']); + assert.deepEqual(cluster._findNodeIds('SLAVE*'), ['SLAVE1', 'SLAVE2']); + + // of singletone instance + var poolNamespace = cluster.of('*', 'RR'); + var poolNamespace2 = cluster.of('*'); + assert.strictEqual(poolNamespace, poolNamespace2); + + // empty pattern + var emptyPoolNamespace = cluster.of(); + assert.strictEqual(poolNamespace, emptyPoolNamespace); + + // wrong selector + var wrongPoolNamespace = cluster.of('*', 'RR2'); + assert.strictEqual(poolNamespace, wrongPoolNamespace); + + cluster.end(); +})(); + +// Test_getConnection_one +(function() { + var cluster = createPoolCluster(); + + cluster.getConnection('MASTER', function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'MASTER'); + } + }.bind(this)); +})(); + +// Test_of_getConnection_one +(function() { + var cluster = createPoolCluster(); + + cluster.of('MASTER').getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'MASTER'); + } + }.bind(this)); +})(); + +// Test_getConnection_multi +(function() { + var cluster = createPoolCluster(); + + cluster.getConnection('SLAVE*', 'RR', function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + cluster.getConnection('SLAVE*', 'RR', function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE2'); + } + }); + }); +})(); + +// Test_of_getConnection_multi +(function() { + var cluster = createPoolCluster(); + var pool = cluster.of('SLAVE*', 'RR'); + + pool.getConnection(function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + pool.getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE2'); + } + }); + }); +})(); + +// Test_of_getConnection_ORDER_selector +(function() { + var cluster = createPoolCluster(); + var pool = cluster.of('SLAVE*', 'ORDER'); + + pool.getConnection(function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + pool.getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + }); + }); +})(); + + +// Test_of_getConnection_default_selector +(function() { + var cluster = createPoolCluster({ + defaultSelector: 'ORDER' + }); + + var pool = cluster.of('SLAVE*'); + + pool.getConnection(function(err, connection) { + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + + pool.getConnection(function(err, connection) { + cluster.end(); + + if (!err) { + assert.strictEqual(connection._clusterId, 'SLAVE1'); + } + }); + }); +})(); + +// Test_retry_throw_error +(function() { + var cluster = common.createPoolCluster({ + canRetry: false + }); + + var poolConfig = common.getTestConfig(); + + var origPort = poolConfig.port; + poolConfig.port = 3300; + cluster.add('ERROR', poolConfig); + + poolConfig.port = origPort; + cluster.add('CORRECT', poolConfig); + + cluster.of('*').getConnection(function (err, connection) { + cluster.end(); + + assert.ok(err instanceof Error); + }); +})(); + +// Test_retry +(function() { + var cluster = common.createPoolCluster(); + + var poolConfig = common.getTestConfig(); + + var origPort = poolConfig.port; + poolConfig.port = 3300; + cluster.add('ERROR', poolConfig); + + poolConfig.port = origPort; + cluster.add('CORRECT', poolConfig); + + cluster.of('*', 'RR').getConnection(function (err, connection) { + cluster.end(); + + assert.ok(err === null); + assert.equal(connection._clusterId, 'CORRECT'); + + assert.equal(cluster._nodes.ERROR.errorCount, 1); + }); +})(); + +// Test_remove_node +(function() { + var cluster = common.createPoolCluster({ + removeNodeErrorCount: 1 + }); + + var poolConfig = common.getTestConfig(); + + var origPort = poolConfig.port; + poolConfig.port = 3300; + cluster.add('ERROR', poolConfig); + + poolConfig.port = origPort; + cluster.add('CORRECT', poolConfig); + + var removedNodeId = ''; + + cluster.on('remove', function(nodeId) { + removedNodeId = nodeId; + }); + + cluster.of('*', 'RR').getConnection(function (err, connection) { + cluster.end(); + + assert.ok(err === null); + assert.equal(connection._clusterId, 'CORRECT'); + + assert.equal(removedNodeId, 'ERROR'); + + assert.ok(typeof cluster._nodes.ERROR === 'undefined'); + assert.equal(cluster._serviceableNodeIds.length, 1); + assert.deepEqual(cluster._serviceableNodeIds, ['CORRECT']); + }); +})(); diff --git a/test/integration/pool/test-connection-wait.js b/test/integration/pool/test-connection-wait.js new file mode 100644 index 000000000..b256d6e9b --- /dev/null +++ b/test/integration/pool/test-connection-wait.js @@ -0,0 +1,15 @@ +var common = require('../../common'); +var assert = require('assert'); + +var pool = common.createPool({ + connectionLimit : 1, + maxWait: 1 +}); + +pool.getConnection(function(err, connection) { + pool.end(); + + if (err !== null) { + assert.equal(err.toString(), 'Error: Timed out (DB Connection)'); + } +}); diff --git a/test/integration/pool/test-destroy-connection.js b/test/integration/pool/test-destroy-connection.js index 2b4761685..898611c24 100644 --- a/test/integration/pool/test-destroy-connection.js +++ b/test/integration/pool/test-destroy-connection.js @@ -5,10 +5,9 @@ var pool = common.createPool(); pool.getConnection(function(err, connection) { if (err) throw err; - assert.strictEqual(connection, pool._allConnections[0]); connection.destroy(); - assert.ok(pool._allConnections.length == 0); + assert.ok(pool._allConnections.length() == 0); assert.ok(connection._poolRemoved); assert.strictEqual(connection.end, Connection.prototype.end); assert.strictEqual(connection.destroy, Connection.prototype.destroy); diff --git a/test/integration/pool/test-end-already-check.js b/test/integration/pool/test-end-already-check.js new file mode 100644 index 000000000..43e1dfe74 --- /dev/null +++ b/test/integration/pool/test-end-already-check.js @@ -0,0 +1,19 @@ +var common = require('../../common'); +var assert = require('assert'); +var Connection = require(common.lib + '/Connection'); + +var config = common.getTestConfig({ + initialSize: 2 +}); + +var pool = common.createPool(config); + +pool.on('initialized', function(poolSize) { + pool.end(function (err) { + assert.equal(err, null); + }); + + pool.end(function (err) { + assert.notEqual(err, null); + }); +}); diff --git a/test/integration/pool/test-get-maxwait.js b/test/integration/pool/test-get-maxwait.js new file mode 100644 index 000000000..00b93a56a --- /dev/null +++ b/test/integration/pool/test-get-maxwait.js @@ -0,0 +1,26 @@ +var common = require('../../common'); +var assert = require('assert'); +var Connection = require(common.lib + '/Connection'); + +var config = common.getTestConfig({ + connectionLimit: 1, + maxWait: 1000 +}); + +var pool = common.createPool(config); + +pool.getConnection(function (err, connection) { + if (err === null) { + setTimeout(function () { + connection.end(); + }, 2000); + } +}); + +pool.getConnection(function (err, connection) { + pool.end(); + + if (err !== null) { + assert.equal(err.toString(), 'Error: Timed out (getConnection)'); + } +}); diff --git a/test/integration/pool/test-initial-create.js b/test/integration/pool/test-initial-create.js new file mode 100644 index 000000000..ea61653b9 --- /dev/null +++ b/test/integration/pool/test-initial-create.js @@ -0,0 +1,25 @@ +var common = require('../../common'); +var assert = require('assert'); +var Connection = require(common.lib + '/Connection'); + +var config = common.getTestConfig({ + initialSize: 2 +}); + +var pool = common.createPool(config); + +var createdPoolCnt = 0; +pool.on('connection', function(connection) { + createdPoolCnt++; + console.log('# create connection'); +}); + +pool.on('initialized', function(poolSize) { + assert.equal(createdPoolCnt, poolSize); + assert.equal(poolSize, pool._allConnections.length()); +}); + +pool.getConnection(function(err, connection) { + console.log('> getConnection 1 : ' + err); + pool.end(); +}); \ No newline at end of file diff --git a/test/integration/pool/test-release-connection-reset.js b/test/integration/pool/test-release-connection-reset.js new file mode 100644 index 000000000..26b5c851c --- /dev/null +++ b/test/integration/pool/test-release-connection-reset.js @@ -0,0 +1,44 @@ +var common = require('../../common'); +var assert = require('assert'); +var Connection = require(common.lib + '/Connection'); + +var config = common.getTestConfig({ + connectionLimit: 1 +}); + +var pool = common.createPool(config); + +var callQuery1_2 = false; + +pool.getConnection(function(err, connection) { + if (err) throw err; + + console.log('# getConnection : 1'); + + connection.query('SELECT SLEEP(1)', function(err) { + console.log('> QUERY 1-1'); + + // release connection. (logic error occurs) + connection.end(); + }); + + connection.query('SELECT 1', function(err) { + // must not execute + callQuery1_2 = true; + + console.log('> QUERY 1-2'); + }); +}); + +pool.getConnection(function(err, connection) { + if (err) throw err; + + console.log('# getConnection : 2'); + + connection.query('SELECT 1', function(err) { + assert.ok(callQuery1_2 === false); + + console.log('> QUERY 2-1'); + pool.end(); + }); +});