Skip to content

Commit 47b08d3

Browse files
Added Fallback for Topology Aware Load Balancing and Refresh Interval Parameter
1 parent d4ddb35 commit 47b08d3

File tree

3 files changed

+90
-87
lines changed

3 files changed

+90
-87
lines changed

packages/pg/lib/client.js

Lines changed: 78 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class Client extends EventEmitter {
6262
this.host = this.connectionParameters.host
6363
this.loadBalance = this.connectionParameters.loadBalance
6464
this.topologyKeys = this.connectionParameters.topologyKeys
65+
this.ybServersRefreshInterval = this.connectionParameters.ybServersRefreshInterval
6566
this.connectionString = config
6667
// "hiding" the password so it doesn't show up in stack traces
6768
// or if the client is console.logged
@@ -127,12 +128,8 @@ class Client extends EventEmitter {
127128
static hostServerInfo = new Map()
128129
// Boolean to check if public IP needs to be used or not
129130
static usePublic = false
130-
// Set of topology Keys provided in URL
131-
static topologyKeySet = new Set()
132-
// time to refresh the ServerMetaData
133-
static REFRESING_TIME = 300 // secs
134-
// Boolean to Refresh ServerMetaData manually (for testing purpose).
135-
static doHardRefresh = false
131+
// Map of topology Keys provided in URL
132+
static topologyKeyMap = new Map()
136133

137134
_errorAllQueries(err) {
138135
const enqueueError = (query) => {
@@ -156,32 +153,56 @@ class Client extends EventEmitter {
156153
}
157154
let minConnectionCount = Number.MAX_VALUE
158155
let leastLoadedHosts = []
159-
let hosts = hostsList.keys()
160-
for (let value of hosts) {
161-
let host = value
162-
if (this.connectionParameters.topologyKeys !== '') {
156+
for (var i = 1; i <= Client.topologyKeyMap.size; i++) {
157+
let hosts = hostsList.keys()
158+
for (let value of hosts) {
159+
let host = value
163160
let placementInfoOfHost
164-
if (!this.checkConnectionMapEmpty()) {
161+
if(Client.hostServerInfo.has(host)){
165162
placementInfoOfHost = Client.hostServerInfo.get(host).placementInfo
166-
} else {
163+
}else{
167164
placementInfoOfHost = hostsList.get(host).placementInfo
168165
}
169-
if (!Client.topologyKeySet.has(placementInfoOfHost)) {
166+
var toCheckStar = placementInfoOfHost.split('.')
167+
var StarplacementInfoOfHost = toCheckStar[0]+"."+toCheckStar[1]+".*"
168+
if (!Client.topologyKeyMap.get(i).includes(placementInfoOfHost) && !Client.topologyKeyMap.get(i).includes(StarplacementInfoOfHost) ) {
170169
continue
171170
}
171+
let hostCount
172+
if (typeof hostsList.get(host) === 'object') {
173+
hostCount = 0
174+
} else {
175+
hostCount = hostsList.get(host)
176+
}
177+
if (minConnectionCount > hostCount) {
178+
leastLoadedHosts = []
179+
minConnectionCount = hostCount
180+
leastLoadedHosts.push(host)
181+
} else if (minConnectionCount === hostCount) {
182+
leastLoadedHosts.push(host)
183+
}
172184
}
173-
let hostCount
174-
if (typeof hostsList.get(host) === 'object') {
175-
hostCount = 0
176-
} else {
177-
hostCount = hostsList.get(host)
185+
if(leastLoadedHosts.length != 0){
186+
break
178187
}
179-
if (minConnectionCount > hostCount) {
180-
leastLoadedHosts = []
181-
minConnectionCount = hostCount
182-
leastLoadedHosts.push(host)
183-
} else if (minConnectionCount === hostCount) {
184-
leastLoadedHosts.push(host)
188+
}
189+
190+
if (leastLoadedHosts.length === 0) {
191+
let hosts = hostsList.keys()
192+
for (let value of hosts) {
193+
let hostCount
194+
if (typeof hostsList.get(value) === 'object') {
195+
hostCount = 0
196+
} else {
197+
hostCount = hostsList.get(value)
198+
}
199+
if (minConnectionCount > hostCount) {
200+
leastLoadedHosts = []
201+
minConnectionCount = hostCount
202+
leastLoadedHosts.push(value)
203+
} else if (minConnectionCount === hostCount) {
204+
leastLoadedHosts.push(value)
205+
}
185206
}
186207
}
187208
if (leastLoadedHosts.length === 0) {
@@ -193,11 +214,25 @@ class Client extends EventEmitter {
193214
}
194215

195216
isValidKey(key) {
196-
var keyParts = key.split('.')
217+
var zones = key.split(':')
218+
if(zones.length == 0 || zones.length >2){
219+
return false
220+
}
221+
var keyParts = zones[0].split('.')
197222
if (keyParts.length !== 3) {
198223
return false
199224
}
200-
return Client.placementInfoHostMap.has(key)
225+
if(zones[1]==undefined){
226+
zones[1]='1'
227+
}
228+
zones[1]=Number(zones[1])
229+
if(zones[1]<1 || zones[1]>10 || isNaN(zones[1]) || !Number.isInteger(zones[1])){
230+
return false
231+
}
232+
if(keyParts[2]!="*"){
233+
return Client.placementInfoHostMap.has(zones[0])
234+
}
235+
return true
201236
}
202237

203238
incrementConnectionCount() {
@@ -249,7 +284,7 @@ class Client extends EventEmitter {
249284
}, this._connectionTimeoutMillis)
250285
}
251286
if (this.connectionParameters.loadBalance) {
252-
if (!this.checkConnectionMapEmpty() && Client.hostServerInfo.size) {
287+
if (Client.connectionMap.size && Client.hostServerInfo.size) {
253288
this.host = this.getLeastLoadedServer(Client.connectionMap)
254289
this.port = Client.hostServerInfo.get(this.host).port
255290
} else if (Client.failedHosts.size) {
@@ -442,12 +477,23 @@ class Client extends EventEmitter {
442477
})
443478
}
444479

445-
createTopologyKeySet() {
480+
createTopologyKeyMap() {
446481
var seperatedKeys = this.connectionParameters.topologyKeys.split(',')
447482
for (let idx = 0; idx < seperatedKeys.length; idx++) {
448483
let key = seperatedKeys[idx]
449484
if (this.isValidKey(key)) {
450-
Client.topologyKeySet.add(key)
485+
var zones = key.split(':')
486+
if(zones[1]==undefined){
487+
zones[1]='1'
488+
}
489+
zones[1]=parseInt(zones[1])
490+
if (Client.topologyKeyMap.has(zones[1])) {
491+
let currentzones = Client.topologyKeyMap.get(zones[1])
492+
currentzones.push(zones[0])
493+
Client.topologyKeyMap.set(zones[1],currentzones)
494+
} else {
495+
Client.topologyKeyMap.set(zones[1],[zones[0]])
496+
}
451497
} else {
452498
throw new Error('Bad Topology Key found - ' + key)
453499
}
@@ -459,22 +505,8 @@ class Client extends EventEmitter {
459505
Client.lastTimeMetaDataFetched = new Date().getTime() / 1000
460506
this.createConnectionMap(data)
461507
if (this.connectionParameters.topologyKeys !== '') {
462-
this.createTopologyKeySet()
463-
}
464-
}
465-
466-
checkConnectionMapEmpty() {
467-
if (this.connectionParameters.topologyKeys === '') {
468-
return Client.connectionMap.size === 0
508+
this.createTopologyKeyMap()
469509
}
470-
let hosts = Client.connectionMap.keys()
471-
for (let value of hosts) {
472-
let placementInfo = Client.hostServerInfo.get(value).placementInfo
473-
if (Client.topologyKeySet.has(placementInfo)) {
474-
return false
475-
}
476-
}
477-
return true
478510
}
479511

480512
nowConnect(callback) {
@@ -490,27 +522,6 @@ class Client extends EventEmitter {
490522
} else if (Client.failedHosts.has(this.host)) {
491523
Client.failedHosts.delete(this.host)
492524
}
493-
if (this.checkConnectionMapEmpty() && Client.failedHosts.size === 0) {
494-
lock.release()
495-
// try with url host and mark that connection type as non-loadBalanced
496-
this.host = this.urlHost
497-
this.connectionParameters.host = this.host
498-
this.connectionParameters.loadBalance = false
499-
this.connection =
500-
this.config.connection ||
501-
new Connection({
502-
stream: this.config.stream,
503-
ssl: this.connectionParameters.ssl,
504-
keepAlive: this.config.keepAlive || false,
505-
keepAliveInitialDelayMillis: this.config.keepAliveInitialDelayMillis || 0,
506-
encoding: this.connectionParameters.client_encoding || 'utf8',
507-
})
508-
this._connecting = false
509-
Client.hostServerInfo.clear()
510-
Client.connectionMap.clear()
511-
this.connect(callback)
512-
return
513-
}
514525
lock.release()
515526
this.connect(callback)
516527
} else {
@@ -543,26 +554,6 @@ class Client extends EventEmitter {
543554
} else if (Client.failedHosts.has(this.host)) {
544555
Client.failedHosts.delete(this.host)
545556
}
546-
if (this.checkConnectionMapEmpty() && Client.failedHosts.size === 0) {
547-
lock.release()
548-
this.host = this.urlHost
549-
this.connectionParameters.host = this.host
550-
this.connectionParameters.loadBalance = false
551-
this.connection =
552-
this.config.connection ||
553-
new Connection({
554-
stream: this.config.stream,
555-
ssl: this.connectionParameters.ssl,
556-
keepAlive: this.config.keepAlive || false,
557-
keepAliveInitialDelayMillis: this.config.keepAliveInitialDelayMillis || 0,
558-
encoding: this.connectionParameters.client_encoding || 'utf8',
559-
})
560-
this._connecting = false
561-
Client.hostServerInfo.clear()
562-
Client.connectionMap.clear()
563-
this.connect(callback)
564-
return
565-
}
566557
lock.release()
567558
this.connect(callback)
568559
} else {
@@ -605,7 +596,7 @@ class Client extends EventEmitter {
605596
isRefreshRequired() {
606597
let currentTime = new Date().getTime() / 1000
607598
let diff = Math.floor(currentTime - Client.lastTimeMetaDataFetched)
608-
return diff >= Client.REFRESING_TIME
599+
return diff >= this.connectionParameters.ybServersRefreshInterval
609600
}
610601

611602
connect(callback) {
@@ -636,7 +627,7 @@ class Client extends EventEmitter {
636627
return this.nowConnect(callback)
637628
})
638629
} else {
639-
if (this.isRefreshRequired() || Client.doHardRefresh) {
630+
if (this.isRefreshRequired()) {
640631
this.getServersInfo()
641632
.then((res) => {
642633
this.updateMetaData(res.rows)

packages/pg/lib/connection-parameters.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class ConnectionParameters {
9595
}
9696
this.loadBalance = val('loadBalance', config)
9797
this.topologyKeys = val('topologyKeys', config)
98+
this.ybServersRefreshInterval = val('ybServersRefreshInterval', config)
9899

99100
if (typeof this.loadBalance === 'string') {
100101
this.loadBalance = this.loadBalance === 'true'
@@ -104,6 +105,13 @@ class ConnectionParameters {
104105
throw new Error(' You need to enable Load Balance feature to use Topology Aware! ')
105106
}
106107
}
108+
this.ybServersRefreshInterval = Number(this.ybServersRefreshInterval)
109+
if(isNaN(this.ybServersRefreshInterval) || !Number.isInteger(this.ybServersRefreshInterval)){
110+
throw new Error(' You need to Enter valid Refresh Interval ')
111+
}
112+
if(this.ybServersRefreshInterval<0 || this.ybServersRefreshInterval>600){
113+
this.ybServersRefreshInterval = 300
114+
}
107115
this.client_encoding = val('client_encoding', config)
108116
this.replication = val('replication', config)
109117
// a domain socket begins with '/'
@@ -143,6 +151,7 @@ class ConnectionParameters {
143151
add(params, this, 'options')
144152
add(params, this, 'loadBalance')
145153
add(params, this, 'topologyKeys')
154+
add(params, this, 'ybServersRefreshInterval')
146155

147156
var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {}
148157
add(params, ssl, 'sslmode')

packages/pg/lib/defaults.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ module.exports = {
2727
// Topology keys
2828
topologyKeys: '',
2929

30+
// Refresh Interval
31+
ybServersRefreshInterval: 300,
32+
3033
// number of rows to return at a time from a prepared statement's
3134
// portal. 0 will return all rows at once
3235
rows: 0,

0 commit comments

Comments
 (0)