Skip to content

Commit 70ce491

Browse files
Merge pull request #2 from yugabyte/fallback
Added Fallback for TALB and Refresh Interval Connection Parameter
2 parents d4ddb35 + c977487 commit 70ce491

File tree

4 files changed

+164
-87
lines changed

4 files changed

+164
-87
lines changed

README.md

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,73 @@
44
<span class="badge-npmversion"><a href="https://npmjs.org/package/pg" title="View this project on NPM"><img src="https://img.shields.io/npm/v/pg.svg" alt="NPM version" /></a></span>
55
<span class="badge-npmdownloads"><a href="https://npmjs.org/package/pg" title="View this project on NPM"><img src="https://img.shields.io/npm/dm/pg.svg" alt="NPM downloads" /></a></span>
66

7-
This is a fork of [node-postgres](https://github.com/brianc/node-postgres) which includes smart feature like Cluster Aware and Topology Aware load balancing. To know more visit the [docs page](https://docs.yugabyte.com/preview/drivers-orms/).
7+
This is a fork of [node-postgres](https://github.com/brianc/node-postgres) which includes the following additional features:
8+
9+
# Connection load balancing
10+
11+
Users can use this feature in two configurations.
12+
13+
### Cluster-aware / Uniform connection load balancing
14+
15+
In the cluster-aware connection load balancing, connections are distributed across all the tservers in the cluster, irrespective of their placements.
16+
17+
To enable the cluster-aware connection load balancing, provide the parameter `loadBalance` set to true as `loadBalance=true` in the connection url or the connection string (DSN style).
18+
19+
```
20+
"postgresql://username:password@localhost:5433/database_name?loadBalance=true"
21+
```
22+
23+
With this parameter specified in the url, the driver will fetch and maintain the list of tservers from the given endpoint (`localhost` in above example) available in the YugabyteDB cluster and distribute the connections equally across them.
24+
25+
This list is refreshed every 5 minutes, when a new connection request is received.
26+
27+
Application needs to use the same connection url to create every connection it needs, so that the distribution happens equally.
28+
29+
### Topology-aware connection load balancing
30+
31+
With topology-aware connnection load balancing, users can target tservers in specific zones by specifying these zones as `topologyKeys` with values in the format `cloudname.regionname.zonename`. Multiple zones can be specified as comma separated values.
32+
33+
The connections will be distributed equally with the tservers in these zones.
34+
35+
Note that, you would still need to specify `loadBalance=true` to enable the topology-aware connection load balancing.
36+
37+
```
38+
"postgresql://username:password@localhost:5433/database_name?loadBalance=true&topologyKeys=cloud1.region1.zone1,cloud1.region1.zone2"
39+
```
40+
### Specifying fallback zones
41+
42+
For topology-aware load balancing, you can now specify fallback placements too. This is not applicable for cluster-aware load balancing.
43+
Each placement value can be suffixed with a colon (`:`) followed by a preference value between 1 and 10.
44+
A preference value of `:1` means it is a primary placement. A preference value of `:2` means it is the first fallback placement and so on.If no preference value is provided, it is considered to be a primary placement (equivalent to one with preference value `:1`). Example given below.
45+
46+
```
47+
"postgres://username:password@localhost:5433/database_name?loadBalance=true&topologyKeys=cloud1.region1.zone1:1,cloud1.region1.zone2:2";
48+
```
49+
50+
You can also use `*` for specifying all the zones in a given region as shown below. This is not allowed for cloud or region values.
51+
52+
```
53+
"postgres://username:password@localhost:5433/database_name?loadBalance=true&topologyKeys=cloud1.region1.*:1,cloud1.region2.*:2";
54+
```
55+
56+
The driver attempts to connect to a node in following order: the least loaded node in the 1) primary placement(s), else in the 2) first fallback if specified, else in the 3) second fallback if specified and so on.
57+
If no nodes are available either in primary placement(s) or in any of the fallback placements, then nodes in the rest of the cluster are attempted.
58+
And this repeats for each connection request.
59+
60+
## Specifying Refresh Interval
61+
62+
Users can specify Refresh Time Interval, in seconds. It is the time interval between two attempts to refresh the information about cluster nodes. Default is 300. Valid values are integers between 0 and 600. Value 0 means refresh for each connection request. Any value outside this range is ignored and the default is used.
63+
64+
To specify Refresh Interval, use the parameter `ybServersRefreshInterval` in the connection url or the connection string.
65+
66+
```
67+
"postgres://username:password@localhost:5433/database_name?ybServersRefreshInterval=X&loadBalance=true&topologyKeys=cloud1.region1.*:1,cloud1.region2.*:2";
68+
```
69+
Here, X is the value of the refresh interval (seconds) in integer.
70+
71+
To know more visit the [docs page](https://docs.yugabyte.com/preview/drivers-orms/).
72+
73+
For a working example which demonstrates the configurations of connection load balancing see the [driver-examples](https://github.com/yugabyte/driver-examples/tree/main/nodejs) repository.
874

975
Non-blocking PostgreSQL client for Node.js. Pure JavaScript and optional native libpq bindings.
1076

packages/pg/lib/client.js

Lines changed: 85 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class Client extends EventEmitter {
6262
this.host = this.connectionParameters.host
6363
this.loadBalance = this.connectionParameters.loadBalance
6464
this.topologyKeys = this.connectionParameters.topologyKeys
65+
this.ybServersRefreshInterval = this.connectionParameters.ybServersRefreshInterval
6566
this.connectionString = config
6667
// "hiding" the password so it doesn't show up in stack traces
6768
// or if the client is console.logged
@@ -127,12 +128,8 @@ class Client extends EventEmitter {
127128
static hostServerInfo = new Map()
128129
// Boolean to check if public IP needs to be used or not
129130
static usePublic = false
130-
// Set of topology Keys provided in URL
131-
static topologyKeySet = new Set()
132-
// time to refresh the ServerMetaData
133-
static REFRESING_TIME = 300 // secs
134-
// Boolean to Refresh ServerMetaData manually (for testing purpose).
135-
static doHardRefresh = false
131+
// Map of topology Keys provided in URL
132+
static topologyKeyMap = new Map()
136133

137134
_errorAllQueries(err) {
138135
const enqueueError = (query) => {
@@ -156,32 +153,56 @@ class Client extends EventEmitter {
156153
}
157154
let minConnectionCount = Number.MAX_VALUE
158155
let leastLoadedHosts = []
159-
let hosts = hostsList.keys()
160-
for (let value of hosts) {
161-
let host = value
162-
if (this.connectionParameters.topologyKeys !== '') {
156+
for (var i = 1; i <= Client.topologyKeyMap.size; i++) {
157+
let hosts = hostsList.keys()
158+
for (let value of hosts) {
159+
let host = value
163160
let placementInfoOfHost
164-
if (!this.checkConnectionMapEmpty()) {
161+
if (Client.hostServerInfo.has(host)) {
165162
placementInfoOfHost = Client.hostServerInfo.get(host).placementInfo
166163
} else {
167164
placementInfoOfHost = hostsList.get(host).placementInfo
168165
}
169-
if (!Client.topologyKeySet.has(placementInfoOfHost)) {
166+
var toCheckStar = placementInfoOfHost.split('.')
167+
var starPlacementInfoOfHost = toCheckStar[0]+"."+toCheckStar[1]+".*"
168+
if (!Client.topologyKeyMap.get(i).includes(placementInfoOfHost) && !Client.topologyKeyMap.get(i).includes(starPlacementInfoOfHost)) {
170169
continue
171170
}
171+
let hostCount
172+
if (typeof hostsList.get(host) === 'object') {
173+
hostCount = 0
174+
} else {
175+
hostCount = hostsList.get(host)
176+
}
177+
if (minConnectionCount > hostCount) {
178+
leastLoadedHosts = []
179+
minConnectionCount = hostCount
180+
leastLoadedHosts.push(host)
181+
} else if (minConnectionCount === hostCount) {
182+
leastLoadedHosts.push(host)
183+
}
172184
}
173-
let hostCount
174-
if (typeof hostsList.get(host) === 'object') {
175-
hostCount = 0
176-
} else {
177-
hostCount = hostsList.get(host)
185+
if(leastLoadedHosts.length != 0){
186+
break
178187
}
179-
if (minConnectionCount > hostCount) {
180-
leastLoadedHosts = []
181-
minConnectionCount = hostCount
182-
leastLoadedHosts.push(host)
183-
} else if (minConnectionCount === hostCount) {
184-
leastLoadedHosts.push(host)
188+
}
189+
190+
if (leastLoadedHosts.length === 0) {
191+
let hosts = hostsList.keys()
192+
for (let value of hosts) {
193+
let hostCount
194+
if (typeof hostsList.get(value) === 'object') {
195+
hostCount = 0
196+
} else {
197+
hostCount = hostsList.get(value)
198+
}
199+
if (minConnectionCount > hostCount) {
200+
leastLoadedHosts = []
201+
minConnectionCount = hostCount
202+
leastLoadedHosts.push(value)
203+
} else if (minConnectionCount === hostCount) {
204+
leastLoadedHosts.push(value)
205+
}
185206
}
186207
}
187208
if (leastLoadedHosts.length === 0) {
@@ -193,11 +214,33 @@ class Client extends EventEmitter {
193214
}
194215

195216
isValidKey(key) {
196-
var keyParts = key.split('.')
217+
var zones = key.split(':')
218+
if (zones.length == 0 || zones.length >2) {
219+
return false
220+
}
221+
var keyParts = zones[0].split('.')
197222
if (keyParts.length !== 3) {
198223
return false
199224
}
200-
return Client.placementInfoHostMap.has(key)
225+
if (zones[1]==undefined) {
226+
zones[1]='1'
227+
}
228+
zones[1]=Number(zones[1])
229+
if (zones[1]<1 || zones[1]>10 || isNaN(zones[1]) || !Number.isInteger(zones[1])) {
230+
return false
231+
}
232+
if (keyParts[2]!="*") {
233+
return Client.placementInfoHostMap.has(zones[0])
234+
} else {
235+
var allPlacementInfo = Client.placementInfoHostMap.keys();
236+
for(let placeInfo of allPlacementInfo){
237+
var placeInfoParts = placeInfo.split('.')
238+
if(keyParts[0]==placeInfoParts[0] && keyParts[1]==placeInfoParts[1]){
239+
return true
240+
}
241+
}
242+
}
243+
return false
201244
}
202245

203246
incrementConnectionCount() {
@@ -249,7 +292,7 @@ class Client extends EventEmitter {
249292
}, this._connectionTimeoutMillis)
250293
}
251294
if (this.connectionParameters.loadBalance) {
252-
if (!this.checkConnectionMapEmpty() && Client.hostServerInfo.size) {
295+
if (Client.connectionMap.size && Client.hostServerInfo.size) {
253296
this.host = this.getLeastLoadedServer(Client.connectionMap)
254297
this.port = Client.hostServerInfo.get(this.host).port
255298
} else if (Client.failedHosts.size) {
@@ -442,12 +485,23 @@ class Client extends EventEmitter {
442485
})
443486
}
444487

445-
createTopologyKeySet() {
488+
createTopologyKeyMap() {
446489
var seperatedKeys = this.connectionParameters.topologyKeys.split(',')
447490
for (let idx = 0; idx < seperatedKeys.length; idx++) {
448491
let key = seperatedKeys[idx]
449492
if (this.isValidKey(key)) {
450-
Client.topologyKeySet.add(key)
493+
var zones = key.split(':')
494+
if (zones[1]==undefined) {
495+
zones[1]='1'
496+
}
497+
zones[1]=parseInt(zones[1])
498+
if (Client.topologyKeyMap.has(zones[1])) {
499+
let currentzones = Client.topologyKeyMap.get(zones[1])
500+
currentzones.push(zones[0])
501+
Client.topologyKeyMap.set(zones[1],currentzones)
502+
} else {
503+
Client.topologyKeyMap.set(zones[1],[zones[0]])
504+
}
451505
} else {
452506
throw new Error('Bad Topology Key found - ' + key)
453507
}
@@ -459,22 +513,8 @@ class Client extends EventEmitter {
459513
Client.lastTimeMetaDataFetched = new Date().getTime() / 1000
460514
this.createConnectionMap(data)
461515
if (this.connectionParameters.topologyKeys !== '') {
462-
this.createTopologyKeySet()
463-
}
464-
}
465-
466-
checkConnectionMapEmpty() {
467-
if (this.connectionParameters.topologyKeys === '') {
468-
return Client.connectionMap.size === 0
469-
}
470-
let hosts = Client.connectionMap.keys()
471-
for (let value of hosts) {
472-
let placementInfo = Client.hostServerInfo.get(value).placementInfo
473-
if (Client.topologyKeySet.has(placementInfo)) {
474-
return false
475-
}
516+
this.createTopologyKeyMap()
476517
}
477-
return true
478518
}
479519

480520
nowConnect(callback) {
@@ -490,27 +530,6 @@ class Client extends EventEmitter {
490530
} else if (Client.failedHosts.has(this.host)) {
491531
Client.failedHosts.delete(this.host)
492532
}
493-
if (this.checkConnectionMapEmpty() && Client.failedHosts.size === 0) {
494-
lock.release()
495-
// try with url host and mark that connection type as non-loadBalanced
496-
this.host = this.urlHost
497-
this.connectionParameters.host = this.host
498-
this.connectionParameters.loadBalance = false
499-
this.connection =
500-
this.config.connection ||
501-
new Connection({
502-
stream: this.config.stream,
503-
ssl: this.connectionParameters.ssl,
504-
keepAlive: this.config.keepAlive || false,
505-
keepAliveInitialDelayMillis: this.config.keepAliveInitialDelayMillis || 0,
506-
encoding: this.connectionParameters.client_encoding || 'utf8',
507-
})
508-
this._connecting = false
509-
Client.hostServerInfo.clear()
510-
Client.connectionMap.clear()
511-
this.connect(callback)
512-
return
513-
}
514533
lock.release()
515534
this.connect(callback)
516535
} else {
@@ -543,26 +562,6 @@ class Client extends EventEmitter {
543562
} else if (Client.failedHosts.has(this.host)) {
544563
Client.failedHosts.delete(this.host)
545564
}
546-
if (this.checkConnectionMapEmpty() && Client.failedHosts.size === 0) {
547-
lock.release()
548-
this.host = this.urlHost
549-
this.connectionParameters.host = this.host
550-
this.connectionParameters.loadBalance = false
551-
this.connection =
552-
this.config.connection ||
553-
new Connection({
554-
stream: this.config.stream,
555-
ssl: this.connectionParameters.ssl,
556-
keepAlive: this.config.keepAlive || false,
557-
keepAliveInitialDelayMillis: this.config.keepAliveInitialDelayMillis || 0,
558-
encoding: this.connectionParameters.client_encoding || 'utf8',
559-
})
560-
this._connecting = false
561-
Client.hostServerInfo.clear()
562-
Client.connectionMap.clear()
563-
this.connect(callback)
564-
return
565-
}
566565
lock.release()
567566
this.connect(callback)
568567
} else {
@@ -605,7 +604,7 @@ class Client extends EventEmitter {
605604
isRefreshRequired() {
606605
let currentTime = new Date().getTime() / 1000
607606
let diff = Math.floor(currentTime - Client.lastTimeMetaDataFetched)
608-
return diff >= Client.REFRESING_TIME
607+
return diff >= this.connectionParameters.ybServersRefreshInterval
609608
}
610609

611610
connect(callback) {
@@ -636,7 +635,7 @@ class Client extends EventEmitter {
636635
return this.nowConnect(callback)
637636
})
638637
} else {
639-
if (this.isRefreshRequired() || Client.doHardRefresh) {
638+
if (this.isRefreshRequired()) {
640639
this.getServersInfo()
641640
.then((res) => {
642641
this.updateMetaData(res.rows)

packages/pg/lib/connection-parameters.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class ConnectionParameters {
9595
}
9696
this.loadBalance = val('loadBalance', config)
9797
this.topologyKeys = val('topologyKeys', config)
98+
this.ybServersRefreshInterval = val('ybServersRefreshInterval', config)
9899

99100
if (typeof this.loadBalance === 'string') {
100101
this.loadBalance = this.loadBalance === 'true'
@@ -104,6 +105,13 @@ class ConnectionParameters {
104105
throw new Error(' You need to enable Load Balance feature to use Topology Aware! ')
105106
}
106107
}
108+
this.ybServersRefreshInterval = Number(this.ybServersRefreshInterval)
109+
if(isNaN(this.ybServersRefreshInterval) || !Number.isInteger(this.ybServersRefreshInterval)){
110+
throw new Error(' You need to Enter valid Refresh Interval ')
111+
}
112+
if(this.ybServersRefreshInterval<0 || this.ybServersRefreshInterval>600){
113+
this.ybServersRefreshInterval = 300
114+
}
107115
this.client_encoding = val('client_encoding', config)
108116
this.replication = val('replication', config)
109117
// a domain socket begins with '/'
@@ -143,6 +151,7 @@ class ConnectionParameters {
143151
add(params, this, 'options')
144152
add(params, this, 'loadBalance')
145153
add(params, this, 'topologyKeys')
154+
add(params, this, 'ybServersRefreshInterval')
146155

147156
var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {}
148157
add(params, ssl, 'sslmode')

packages/pg/lib/defaults.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ module.exports = {
2727
// Topology keys
2828
topologyKeys: '',
2929

30+
// Refresh Interval
31+
ybServersRefreshInterval: 300,
32+
3033
// number of rows to return at a time from a prepared statement's
3134
// portal. 0 will return all rows at once
3235
rows: 0,

0 commit comments

Comments
 (0)