Skip to content

Commit 9828615

Browse files
antonbrianc
authored andcommitted
bugfixes in copy from stream. drain event
1 parent d2b21aa commit 9828615

File tree

1 file changed

+34
-14
lines changed

1 file changed

+34
-14
lines changed

lib/copystream.js

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,23 @@ var CopyFromStream = function () {
55
this._buffer = new Buffer(0);
66
this._connection = false;
77
this._finished = false;
8+
this._finishedSent = false;
9+
this._closed = false;
810
this._error = false;
11+
this._dataBuffered = false;
912
this.__defineGetter__("writable", this._writable.bind(this));
1013
};
1114
util.inherits(CopyFromStream, Stream);
1215
CopyFromStream.prototype._writable = function () {
13-
return !this._finished && !this._error;
16+
return !(this._finished || this._error);
1417
}
1518
CopyFromStream.prototype.startStreamingToConnection = function (connection) {
19+
if (this._error) {
20+
return;
21+
}
1622
this._connection = connection;
17-
this._handleChunk();
18-
this._endIfConnectionReady();
23+
this._sendIfConnectionReady();
24+
this._endIfNeedAndPossible();
1925
};
2026
CopyFromStream.prototype._handleChunk = function (string, encoding) {
2127
var dataChunk,
@@ -30,52 +36,66 @@ CopyFromStream.prototype._handleChunk = function (string, encoding) {
3036
//Buffer.concat is better, but it's missing
3137
//in node v0.6.x
3238
tmpBuffer = new Buffer(this._buffer.length + dataChunk.length);
33-
tmpBuffer.copy(this._buffer);
34-
tmpBuffer.copy(dataChunk, this._buffer.length);
39+
this._buffer.copy(tmpBuffer);
40+
dataChunk.copy(tmpBuffer, this._buffer.length);
3541
this._buffer = tmpBuffer;
3642
} else {
3743
this._buffer = dataChunk;
3844
}
3945
}
46+
4047
return this._sendIfConnectionReady();
4148
};
4249
CopyFromStream.prototype._sendIfConnectionReady = function () {
4350
var dataSent = false;
44-
if (this._connection && this._buffer.length) {
51+
if (this._connection) {
4552
dataSent = this._connection.sendCopyFromChunk(this._buffer);
4653
this._buffer = new Buffer(0);
54+
if (this._dataBuffered) {
55+
this.emit('drain');
56+
}
57+
this._dataBuffered = false;
58+
} else {
59+
this._dataBuffered = true;
4760
}
4861
return dataSent;
4962
};
50-
CopyFromStream.prototype._endIfConnectionReady = function () {
51-
if (this._connection && this._finished) {
52-
//TODO change function name
63+
CopyFromStream.prototype._endIfNeedAndPossible = function () {
64+
if (this._connection && this._finished && !this._finishedSent) {
65+
this._finishedSent = true;
5366
this._connection.endCopyFrom();
5467
}
5568
}
5669
CopyFromStream.prototype.write = function (string, encoding) {
57-
if (!this._writable) {
58-
//TODO possibly throw exception?
70+
if (this._error || this._finished) {
5971
return false;
6072
}
6173
return this._handleChunk.apply(this, arguments);
6274
};
6375
CopyFromStream.prototype.end = function (string, encondig) {
64-
if(!this._writable) {
65-
//TODO possibly throw exception?
76+
if (this._error || this._finished) {
6677
return false;
6778
}
6879
this._finished = true;
6980
if (string !== undefined) {
7081
this._handleChunk.apply(this, arguments);
7182
};
72-
this._endIfConnectionReady();
83+
this._endIfNeedAndPossible();
7384
};
7485
CopyFromStream.prototype.error = function (error) {
86+
if (this._error || this._closed) {
87+
return false;
88+
}
7589
this._error = true;
7690
this.emit('error', error);
7791
};
7892
CopyFromStream.prototype.close = function () {
93+
if (this._error || this._closed) {
94+
return false;
95+
}
96+
if (!this._finishedSent) {
97+
throw new Error("seems to be error in code that uses CopyFromStream");
98+
}
7999
this.emit("close");
80100
};
81101
var CopyToStream = function () {

0 commit comments

Comments
 (0)