Skip to content

Commit c014096

Browse files
antonbrianc
authored andcommitted
COPY TO/FROM native/libpq done. Looks like it works, but need to test
1 parent d00e534 commit c014096

File tree

7 files changed

+363
-17
lines changed

7 files changed

+363
-17
lines changed

lib/client.js

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ var Query = require(__dirname + '/query');
66
var utils = require(__dirname + '/utils');
77
var defaults = require(__dirname + '/defaults');
88
var Connection = require(__dirname + '/connection');
9-
9+
var CopyFromStream = require(__dirname + '/copystream').CopyFromStream;
10+
var CopyToStream = require(__dirname + '/copystream').CopyToStream;
1011
var Client = function(config) {
1112
EventEmitter.call(this);
1213
if(typeof config === 'string') {
@@ -104,7 +105,12 @@ p.connect = function(callback) {
104105
con.sync();
105106
}
106107
});
107-
108+
con.on('copyInResponse', function(msg) {
109+
self.activeQuery.streamData(self.connection);
110+
});
111+
con.on('copyData', function (msg) {
112+
self.activeQuery.handleCopyFromChunk(msg.chunk);
113+
});
108114
if (!callback) {
109115
self.emit('connect');
110116
} else {
@@ -184,7 +190,30 @@ p._pulseQueryQueue = function() {
184190
}
185191
}
186192
};
193+
p._copy = function (text, stream) {
194+
var config = {},
195+
query;
196+
config.text = text;
197+
config.stream = stream;
198+
config.callback = function (error) {
199+
if (error) {
200+
config.stream.error(error);
201+
} else {
202+
config.stream.close();
203+
}
204+
}
205+
query = new Query(config);
206+
this.queryQueue.push(query);
207+
this._pulseQueryQueue();
208+
return config.stream;
187209

210+
};
211+
p.copyFrom = function (text) {
212+
return this._copy(text, new CopyFromStream());
213+
}
214+
p.copyTo = function (text) {
215+
return this._copy(text, new CopyToStream());
216+
}
188217
p.query = function(config, values, callback) {
189218
//can take in strings, config object or query object
190219
var query = (config instanceof Query) ? config : new Query(config, values, callback);

lib/connection.js

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,12 @@ p.describe = function(msg, more) {
261261
this.writer.addCString(msg.type + (msg.name || ''));
262262
this._send(0x44, more);
263263
};
264-
264+
p.sendCopyFromChunk = function (chunk) {
265+
this.stream.write(this.writer.add(chunk).flush(0x64));
266+
}
267+
p.endCopyFrom = function () {
268+
this.stream.write(this.writer.add(emptyBuffer).flush(0x63));
269+
}
265270
//parsing methods
266271
p.setBuffer = function(buffer) {
267272
if(this.lastBuffer) { //we have unfinished biznaz
@@ -311,7 +316,6 @@ p.parseMessage = function() {
311316
var msg = {
312317
length: length
313318
};
314-
315319
switch(id)
316320
{
317321

@@ -375,6 +379,21 @@ p.parseMessage = function() {
375379
msg.name = 'portalSuspended';
376380
return msg;
377381

382+
case 0x47: //G
383+
msg.name = 'copyInResponse';
384+
return this.parseGH(msg);
385+
386+
case 0x48: //H
387+
msg.name = 'copyOutResponse';
388+
return this.parseGH(msg);
389+
case 0x63: //c
390+
msg.name = 'copyDone';
391+
return msg;
392+
393+
case 0x64: //d
394+
msg.name = 'copyData';
395+
return this.parsed(msg);
396+
378397
default:
379398
throw new Error("Unrecognized message code " + id);
380399
}
@@ -505,7 +524,20 @@ p.parseA = function(msg) {
505524
msg.payload = this.parseCString();
506525
return msg;
507526
};
508-
527+
p.parseGH = function (msg) {
528+
msg.binary = Boolean(this.parseInt8());
529+
var columnCount = this.parseInt16();
530+
msg.columnTypes = [];
531+
for(var i = 0; i<columnCount; i++) {
532+
msg.columnTypes[i] = this.parseInt16();
533+
}
534+
return msg;
535+
};
536+
p.parseInt8 = function () {
537+
var value = Number(this.buffer[this.offset]);
538+
this.offset++;
539+
return value;
540+
}
509541
p.readChar = function() {
510542
return Buffer([this.buffer[this.offset++]]).toString(this.encoding);
511543
};
@@ -544,5 +576,10 @@ p.parseCString = function() {
544576
while(this.buffer[this.offset++]) { };
545577
return this.buffer.toString(this.encoding, start, this.offset - 1);
546578
};
579+
p.parsed = function (msg) {
580+
//exclude length field
581+
msg.chunk = this.readBytes(msg.length - 4);
582+
return msg;
583+
}
547584
//end parsing methods
548585
module.exports = Connection;

lib/copystream.js

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
var Stream = require('stream').Stream;
2+
var util = require('util');
3+
var CopyFromStream = function () {
4+
Stream.apply(this, arguments);
5+
this._buffer = new Buffer(0);
6+
this._connection = false;
7+
this._finished = false;
8+
this._error = false;
9+
this.__defineGetter__("writable", this._writable.bind(this));
10+
};
11+
util.inherits(CopyFromStream, Stream);
12+
CopyFromStream.prototype._writable = function () {
13+
return !this._finished && !this._error;
14+
}
15+
CopyFromStream.prototype.startStreamingToConnection = function (connection) {
16+
this._connection = connection;
17+
this._handleChunk();
18+
this._endIfConnectionReady();
19+
};
20+
CopyFromStream.prototype._handleChunk = function (string, encoding) {
21+
var dataChunk,
22+
tmpBuffer;
23+
if (string !== undefined) {
24+
if (string instanceof Buffer) {
25+
dataChunk = string;
26+
} else {
27+
dataChunk = new Buffer(string, encoding);
28+
}
29+
if (this._buffer.length) {
30+
//Buffer.concat is better, but it's missing
31+
//in node v0.6.x
32+
tmpBuffer = new Buffer(this._buffer.length + dataChunk.length);
33+
tmpBuffer.copy(this._buffer);
34+
tmpBuffer.copy(dataChunk, this._buffer.length);
35+
this._buffer = tmpBuffer;
36+
} else {
37+
this._buffer = dataChunk;
38+
}
39+
}
40+
return this._sendIfConnectionReady();
41+
};
42+
CopyFromStream.prototype._sendIfConnectionReady = function () {
43+
var dataSent = false;
44+
if (this._connection && this._buffer.length) {
45+
dataSent = this._connection.sendCopyFromChunk(this._buffer);
46+
this._buffer = new Buffer(0);
47+
}
48+
return dataSent;
49+
};
50+
CopyFromStream.prototype._endIfConnectionReady = function () {
51+
if (this._connection && this._finished) {
52+
//TODO change function name
53+
this._connection.endCopyFrom();
54+
}
55+
}
56+
CopyFromStream.prototype.write = function (string, encoding) {
57+
if (!this._writable) {
58+
//TODO possibly throw exception?
59+
return false;
60+
}
61+
return this._handleChunk.apply(this, arguments);
62+
};
63+
CopyFromStream.prototype.end = function (string, encondig) {
64+
if(!this._writable) {
65+
//TODO possibly throw exception?
66+
return false;
67+
}
68+
this._finished = true;
69+
if (string !== undefined) {
70+
this._handleChunk.apply(this, arguments);
71+
};
72+
this._endIfConnectionReady();
73+
};
74+
CopyFromStream.prototype.error = function (error) {
75+
this._error = true;
76+
this.emit('error', error);
77+
};
78+
CopyFromStream.prototype.close = function () {
79+
this.emit("close");
80+
};
81+
var CopyToStream = function () {
82+
Stream.apply(this, arguments);
83+
this._error = false;
84+
this._finished = false;
85+
this._paused = false;
86+
this.buffer = new Buffer(0);
87+
this._encoding = undefined;
88+
this.__defineGetter__('readable', this._readable.bind(this));
89+
};
90+
util.inherits(CopyToStream, Stream);
91+
CopyToStream.prototype._outputDataChunk = function () {
92+
if (this._paused) {
93+
return;
94+
}
95+
if (this.buffer.length) {
96+
if (this._encoding) {
97+
this.emit('data', this.buffer.toString(encoding));
98+
} else {
99+
this.emit('data', this.buffer);
100+
}
101+
this.buffer = new Buffer(0);
102+
}
103+
};
104+
CopyToStream.prototype._readable = function () {
105+
return !this._finished && !this._error;
106+
}
107+
CopyToStream.prototype.error = function (error) {
108+
if (!this.readable) {
109+
return false;
110+
}
111+
this._error = error;
112+
if (!this._paused) {
113+
this.emit('error', error);
114+
}
115+
};
116+
CopyToStream.prototype.close = function () {
117+
if (!this.readable) {
118+
return false;
119+
}
120+
this._finished = true;
121+
if (!this._paused) {
122+
this.emit("end");
123+
}
124+
};
125+
CopyToStream.prototype.handleChunk = function (chunk) {
126+
var tmpBuffer;
127+
if (!this.readable) {
128+
return;
129+
}
130+
if (!this.buffer.length) {
131+
this.buffer = chunk;
132+
} else {
133+
tmpBuffer = new Buffer(this.buffer.length + chunk.length);
134+
this.buffer.copy(tmpBuffer);
135+
chunk.copy(tmpBuffer, this.buffer.length);
136+
this.buffer = tmpBuffer;
137+
}
138+
this._outputDataChunk();
139+
};
140+
CopyToStream.prototype.pause = function () {
141+
if (!this.readable) {
142+
return false;
143+
}
144+
this._paused = true;
145+
};
146+
CopyToStream.prototype.resume = function () {
147+
if (!this._paused) {
148+
return false;
149+
}
150+
this._paused = false;
151+
this._outputDataChunk();
152+
if (this._error) {
153+
return this.emit('error', this._error);
154+
}
155+
if (this._finished) {
156+
return this.emit('end');
157+
}
158+
};
159+
CopyToStream.prototype.setEncoding = function (encoding) {
160+
this._encoding = encoding;
161+
};
162+
163+
module.exports = {
164+
CopyFromStream: CopyFromStream,
165+
CopyToStream: CopyToStream
166+
};

lib/native/index.js

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//require the c++ bindings & export to javascript
22
var EventEmitter = require('events').EventEmitter;
33
var utils = require(__dirname + "/../utils");
4+
var CopyFromStream = require(__dirname + '/../copystream').CopyFromStream;
5+
var CopyToStream = require(__dirname + '/../copystream').CopyToStream;
46

57
var binding;
68

@@ -48,7 +50,31 @@ p.connect = function(cb) {
4850
nativeConnect.call(self, conString);
4951
})
5052
}
51-
53+
p._copy = function (text, stream) {
54+
var q = new NativeQuery(text, function (error) {
55+
if (error) {
56+
q.stream.error(error);
57+
} else {
58+
q.stream.close();
59+
}
60+
});
61+
q.stream = stream;
62+
this._queryQueue.push(q);
63+
this._pulseQueryQueue();
64+
return q.stream;
65+
}
66+
p.copyFrom = function (text) {
67+
return this._copy(text, new CopyFromStream());
68+
};
69+
p.copyTo = function (text) {
70+
return this._copy(text, new CopyToStream());
71+
};
72+
p.sendCopyFromChunk = function (chunk) {
73+
this._sendCopyFromChunk(chunk);
74+
};
75+
p.endCopyFrom = function () {
76+
this._endCopyFrom();
77+
};
5278
p.query = function(config, values, callback) {
5379
var query = (config instanceof NativeQuery) ? config : new NativeQuery(config, values, callback);
5480
this._queryQueue.push(query);
@@ -167,7 +193,12 @@ var clientBuilder = function(config) {
167193
connection._pulseQueryQueue();
168194
}
169195
});
170-
196+
connection.on('_copyInResponse', function () {
197+
connection._activeQuery.streamData(connection);
198+
});
199+
connection.on('_copyData', function (chunk) {
200+
connection._activeQuery.handleCopyFromChunk(chunk);
201+
});
171202
return connection;
172203
};
173204

lib/native/query.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,10 @@ p.handleReadyForQuery = function(meta) {
6767
}
6868
this.emit('end', this._result);
6969
};
70-
70+
p.streamData = function (connection) {
71+
this.stream.startStreamingToConnection(connection);
72+
};
73+
p.handleCopyFromChunk = function (chunk) {
74+
this.stream.handleChunk(chunk);
75+
}
7176
module.exports = NativeQuery;

lib/query.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var Query = function(config, values, callback) {
1717
this.types = config.types;
1818
this.name = config.name;
1919
this.binary = config.binary;
20+
this.stream = config.stream;
2021
//use unique portal name each time
2122
this.portal = config.portal || ""
2223
this.callback = config.callback;
@@ -168,5 +169,10 @@ p.prepare = function(connection) {
168169

169170
this.getRows(connection);
170171
};
171-
172+
p.streamData = function (connection) {
173+
this.stream.startStreamingToConnection(connection);
174+
};
175+
p.handleCopyFromChunk = function (chunk) {
176+
this.stream.handleChunk(chunk);
177+
}
172178
module.exports = Query;

0 commit comments

Comments
 (0)