Skip to content

Commit 1437fc3

Browse files
author
Kibae Shin
committed
- Support for Logical Replication Stream
- Added WalStream class
1 parent d3c01b1 commit 1437fc3

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

lib/connection.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,9 @@ Connection.prototype.parseMessage = function(buffer) {
425425
case 0x48: //H
426426
return this.parseH(buffer, length);
427427

428+
case 0x57: //W
429+
return new Message('replicationStart', length);
430+
428431
case 0x63: //c
429432
return new Message('copyDone', length);
430433

lib/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ var defaults = require('./defaults');
1313
var Connection = require('./connection');
1414
var ConnectionParameters = require('./connection-parameters');
1515
var poolFactory = require('./pool-factory');
16+
var WalStream = require('./wallstream');
1617

1718
var PG = function(clientConstructor) {
1819
EventEmitter.call(this);
@@ -23,6 +24,7 @@ var PG = function(clientConstructor) {
2324
this._pools = [];
2425
this.Connection = Connection;
2526
this.types = require('pg-types');
27+
this.WalStream = WalStream;
2628
};
2729

2830
util.inherits(PG, EventEmitter);

lib/wallstream.js

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/**
2+
* Copyright (c) 2017 Kibae Shin ([email protected])
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the MIT license found in the
6+
* README.md file in the root directory of this source tree.
7+
*/
8+
var EventEmitter = require('events').EventEmitter;
9+
var Client = require('./client');
10+
var util = require('util');
11+
12+
var WalStream = function(config) {
13+
EventEmitter.call(this);
14+
15+
var c = config || {};
16+
c.replication = 'database';
17+
18+
var self = this;
19+
20+
var client;
21+
var stoped = false;
22+
this.getChanges = function(slotName, uptoLsn, option, cb /*(err)*/) {
23+
option = option || {};
24+
/*
25+
* includeXids : include xid on BEGIN and COMMIT, default false
26+
* includeTimestamp : include timestamp on COMMIT, default false
27+
* skipEmptyXacts : skip empty transaction like DDL, default true
28+
*/
29+
30+
stoped = false;
31+
client = new Client(config);
32+
33+
client.on('error', function(err) {
34+
self.emit('error', err);
35+
});
36+
37+
client.connect(function(err) {
38+
//error handling
39+
if (err) {
40+
self.emit('error', err);
41+
return;
42+
}
43+
44+
var sql = 'START_REPLICATION SLOT ' + slotName + ' LOGICAL ' + (uptoLsn ? uptoLsn : '0/00000000');
45+
var opts = [
46+
'"include-xids" \'' + (option.includeXids === true ? 'on' : 'off') + '\'',
47+
'"include-timestamp" \'' + (option.includeTimestamp === true ? 'on' : 'off') + '\'',
48+
'"skip-empty-xacts" \'' + (option.skipEmptyXacts !== false ? 'on' : 'off') + '\'',
49+
];
50+
sql += ' (' + (opts.join(' , ')) + ')';
51+
52+
client.query(sql, function(err) {
53+
if (err) {
54+
if (!stoped && cb) {
55+
cb(err);
56+
cb = null;
57+
}
58+
}
59+
cb = null;
60+
});
61+
62+
client.connection.once('replicationStart', function() {
63+
//start
64+
self.emit('start', self);
65+
client.connection.on('copyData', function(msg) {
66+
if (msg.chunk[0] != 0x77) {
67+
return;
68+
}
69+
70+
var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase());
71+
self.emit('data', {
72+
lsn: lsn,
73+
log: msg.chunk.slice(25),
74+
});
75+
});
76+
});
77+
});
78+
return self;
79+
};
80+
81+
this.stop = function() {
82+
stoped = true;
83+
if (client) {
84+
client.end();
85+
client = null;
86+
}
87+
};
88+
};
89+
90+
util.inherits(WalStream, EventEmitter);
91+
92+
module.exports = WalStream;

0 commit comments

Comments
 (0)