|
| 1 | +/** |
| 2 | + * Copyright (c) 2010-2017 Brian Carlson ([email protected]) |
| 3 | + * All rights reserved. |
| 4 | + * |
| 5 | + * This source code is licensed under the MIT license found in the |
| 6 | + * README.md file in the root directory of this source tree. |
| 7 | + */ |
| 8 | + |
| 9 | +var Native = require('pg-native'); |
| 10 | +var TypeOverrides = require('../type-overrides'); |
| 11 | +var semver = require('semver'); |
| 12 | +var pkg = require('../../package.json'); |
| 13 | +var assert = require('assert'); |
| 14 | +var EventEmitter = require('events').EventEmitter; |
| 15 | +var util = require('util'); |
| 16 | +var ConnectionParameters = require('../connection-parameters'); |
| 17 | + |
| 18 | +var msg = 'Version >= ' + pkg.minNativeVersion + ' of pg-native required.'; |
| 19 | +assert(semver.gte(Native.version, pkg.minNativeVersion), msg); |
| 20 | + |
| 21 | +var NativeQuery = require('./query'); |
| 22 | + |
| 23 | +var Client = module.exports = function(config) { |
| 24 | + EventEmitter.call(this); |
| 25 | + config = config || {}; |
| 26 | + |
| 27 | + this._types = new TypeOverrides(config.types); |
| 28 | + |
| 29 | + this.native = new Native({ |
| 30 | + types: this._types |
| 31 | + }); |
| 32 | + |
| 33 | + this._queryQueue = []; |
| 34 | + this._connected = false; |
| 35 | + |
| 36 | + //keep these on the object for legacy reasons |
| 37 | + //for the time being. TODO: deprecate all this jazz |
| 38 | + var cp = this.connectionParameters = new ConnectionParameters(config); |
| 39 | + this.user = cp.user; |
| 40 | + this.password = cp.password; |
| 41 | + this.database = cp.database; |
| 42 | + this.host = cp.host; |
| 43 | + this.port = cp.port; |
| 44 | + |
| 45 | + //a hash to hold named queries |
| 46 | + this.namedQueries = {}; |
| 47 | +}; |
| 48 | + |
| 49 | +Client.Query = NativeQuery; |
| 50 | + |
| 51 | +util.inherits(Client, EventEmitter); |
| 52 | + |
| 53 | +//connect to the backend |
| 54 | +//pass an optional callback to be called once connected |
| 55 | +//or with an error if there was a connection error |
| 56 | +//if no callback is passed and there is a connection error |
| 57 | +//the client will emit an error event. |
| 58 | +Client.prototype.connect = function(cb) { |
| 59 | + var self = this; |
| 60 | + |
| 61 | + var onError = function(err) { |
| 62 | + if(cb) return cb(err); |
| 63 | + return self.emit('error', err); |
| 64 | + }; |
| 65 | + |
| 66 | + this.connectionParameters.getLibpqConnectionString(function(err, conString) { |
| 67 | + if(err) return onError(err); |
| 68 | + self.native.connect(conString, function(err) { |
| 69 | + if(err) return onError(err); |
| 70 | + |
| 71 | + //set internal states to connected |
| 72 | + self._connected = true; |
| 73 | + |
| 74 | + //handle connection errors from the native layer |
| 75 | + self.native.on('error', function(err) { |
| 76 | + //error will be handled by active query |
| 77 | + if(self._activeQuery && self._activeQuery.state != 'end') { |
| 78 | + return; |
| 79 | + } |
| 80 | + self.emit('error', err); |
| 81 | + }); |
| 82 | + |
| 83 | + self.native.on('notification', function(msg) { |
| 84 | + self.emit('notification', { |
| 85 | + channel: msg.relname, |
| 86 | + payload: msg.extra |
| 87 | + }); |
| 88 | + }); |
| 89 | + |
| 90 | + //signal we are connected now |
| 91 | + self.emit('connect'); |
| 92 | + self._pulseQueryQueue(true); |
| 93 | + |
| 94 | + //possibly call the optional callback |
| 95 | + if(cb) cb(); |
| 96 | + }); |
| 97 | + }); |
| 98 | +}; |
| 99 | + |
| 100 | +//send a query to the server |
| 101 | +//this method is highly overloaded to take |
| 102 | +//1) string query, optional array of parameters, optional function callback |
| 103 | +//2) object query with { |
| 104 | +// string query |
| 105 | +// optional array values, |
| 106 | +// optional function callback instead of as a separate parameter |
| 107 | +// optional string name to name & cache the query plan |
| 108 | +// optional string rowMode = 'array' for an array of results |
| 109 | +// } |
| 110 | +Client.prototype.query = function(config, values, callback) { |
| 111 | + if (typeof config.submit == 'function') { |
| 112 | + // accept query(new Query(...), (err, res) => { }) style |
| 113 | + if (typeof values == 'function') { |
| 114 | + config.callback = values; |
| 115 | + } |
| 116 | + this._queryQueue.push(config); |
| 117 | + this._pulseQueryQueue(); |
| 118 | + return config; |
| 119 | + } |
| 120 | + |
| 121 | + var conf = { }; |
| 122 | + |
| 123 | + //support query('text', ...) style calls |
| 124 | + if(typeof config == 'string') { |
| 125 | + conf.text = config; |
| 126 | + } |
| 127 | + |
| 128 | + //support passing everything in via a config object |
| 129 | + if(typeof config == 'object') { |
| 130 | + conf.text = config.text; |
| 131 | + conf.values = config.values; |
| 132 | + conf.name = config.name; |
| 133 | + conf.callback = config.callback; |
| 134 | + conf.rowMode = config.rowMode; |
| 135 | + } |
| 136 | + |
| 137 | + //support query({...}, function() {}) style calls |
| 138 | + //& support query(..., ['values'], ...) style calls |
| 139 | + if(typeof values == 'function') { |
| 140 | + conf.callback = values; |
| 141 | + } |
| 142 | + else if(util.isArray(values)) { |
| 143 | + conf.values = values; |
| 144 | + } |
| 145 | + if(typeof callback == 'function') { |
| 146 | + conf.callback = callback; |
| 147 | + } |
| 148 | + |
| 149 | + var query = new NativeQuery(conf); |
| 150 | + this._queryQueue.push(query); |
| 151 | + this._pulseQueryQueue(); |
| 152 | + return query; |
| 153 | +}; |
| 154 | + |
| 155 | +//disconnect from the backend server |
| 156 | +Client.prototype.end = function(cb) { |
| 157 | + var self = this; |
| 158 | + if(!this._connected) { |
| 159 | + this.once('connect', this.end.bind(this, cb)); |
| 160 | + } |
| 161 | + this.native.end(function() { |
| 162 | + //send an error to the active query |
| 163 | + if(self._hasActiveQuery()) { |
| 164 | + var msg = 'Connection terminated'; |
| 165 | + self._queryQueue.length = 0; |
| 166 | + self._activeQuery.handleError(new Error(msg)); |
| 167 | + } |
| 168 | + self.emit('end'); |
| 169 | + if(cb) cb(); |
| 170 | + }); |
| 171 | +}; |
| 172 | + |
| 173 | +Client.prototype._hasActiveQuery = function() { |
| 174 | + return this._activeQuery && this._activeQuery.state != 'error' && this._activeQuery.state != 'end'; |
| 175 | +}; |
| 176 | + |
| 177 | +Client.prototype._pulseQueryQueue = function(initialConnection) { |
| 178 | + if(!this._connected) { |
| 179 | + return; |
| 180 | + } |
| 181 | + if(this._hasActiveQuery()) { |
| 182 | + return; |
| 183 | + } |
| 184 | + var query = this._queryQueue.shift(); |
| 185 | + if(!query) { |
| 186 | + if(!initialConnection) { |
| 187 | + this.emit('drain'); |
| 188 | + } |
| 189 | + return; |
| 190 | + } |
| 191 | + this._activeQuery = query; |
| 192 | + query.submit(this); |
| 193 | + var self = this; |
| 194 | + query.once('_done', function() { |
| 195 | + self._pulseQueryQueue(); |
| 196 | + }); |
| 197 | +}; |
| 198 | + |
| 199 | +//attempt to cancel an in-progress query |
| 200 | +Client.prototype.cancel = function(query) { |
| 201 | + if(this._activeQuery == query) { |
| 202 | + this.native.cancel(function() {}); |
| 203 | + } else if (this._queryQueue.indexOf(query) != -1) { |
| 204 | + this._queryQueue.splice(this._queryQueue.indexOf(query), 1); |
| 205 | + } |
| 206 | +}; |
| 207 | + |
| 208 | +Client.prototype.setTypeParser = function(oid, format, parseFn) { |
| 209 | + return this._types.setTypeParser(oid, format, parseFn); |
| 210 | +}; |
| 211 | + |
| 212 | +Client.prototype.getTypeParser = function(oid, format) { |
| 213 | + return this._types.getTypeParser(oid, format); |
| 214 | +}; |
0 commit comments