Skip to content

Commit 016afe2

Browse files
koichikisaacs
authored andcommitted
streams: fix pipe is destructed by 'end' from destination
1 parent 109f8e2 commit 016afe2

File tree

2 files changed

+37
-12
lines changed

2 files changed

+37
-12
lines changed

lib/stream.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ Stream.prototype.pipe = function(dest, options) {
106106
source.on('end', cleanup);
107107
source.on('close', cleanup);
108108

109-
dest.on('end', cleanup);
110109
dest.on('close', cleanup);
111110

112111
dest.emit('pipe', source);

test/simple/test-stream-pipe-cleanup.js

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,17 @@ function Readable() {
4747
}
4848
util.inherits(Readable, stream.Stream);
4949

50+
function Duplex() {
51+
this.readable = true;
52+
Writable.call(this);
53+
}
54+
util.inherits(Duplex, Writable);
55+
5056
var i = 0;
5157
var limit = 100;
5258

5359
var w = new Writable();
5460

55-
console.error = function(text) {
56-
throw new Error(text);
57-
};
58-
5961
var r;
6062

6163
for (i = 0; i < limit; i++) {
@@ -80,17 +82,41 @@ w.endCalls = 0;
8082

8183
r = new Readable();
8284

83-
for (i = 0; i < limit; i++) {
84-
w = new Writable();
85-
r.pipe(w);
86-
w.emit('end');
87-
}
88-
assert.equal(0, w.listeners('end').length);
89-
9085
for (i = 0; i < limit; i++) {
9186
w = new Writable();
9287
r.pipe(w);
9388
w.emit('close');
9489
}
9590
assert.equal(0, w.listeners('close').length);
9691

92+
r = new Readable();
93+
w = new Writable();
94+
var d = new Duplex();
95+
r.pipe(d); // pipeline A
96+
d.pipe(w); // pipeline B
97+
assert.equal(r.listeners('end').length, 2); // A.onend, A.cleanup
98+
assert.equal(r.listeners('close').length, 2); // A.onclose, A.cleanup
99+
assert.equal(d.listeners('end').length, 2); // B.onend, B.cleanup
100+
assert.equal(d.listeners('close').length, 3); // A.cleanup, B.onclose, B.cleanup
101+
assert.equal(w.listeners('end').length, 0);
102+
assert.equal(w.listeners('close').length, 1); // B.cleanup
103+
104+
r.emit('end');
105+
assert.equal(d.endCalls, 1);
106+
assert.equal(w.endCalls, 0);
107+
assert.equal(r.listeners('end').length, 0);
108+
assert.equal(r.listeners('close').length, 0);
109+
assert.equal(d.listeners('end').length, 2); // B.onend, B.cleanup
110+
assert.equal(d.listeners('close').length, 2); // B.onclose, B.cleanup
111+
assert.equal(w.listeners('end').length, 0);
112+
assert.equal(w.listeners('close').length, 1); // B.cleanup
113+
114+
d.emit('end');
115+
assert.equal(d.endCalls, 1);
116+
assert.equal(w.endCalls, 1);
117+
assert.equal(r.listeners('end').length, 0);
118+
assert.equal(r.listeners('close').length, 0);
119+
assert.equal(d.listeners('end').length, 0);
120+
assert.equal(d.listeners('close').length, 0);
121+
assert.equal(w.listeners('end').length, 0);
122+
assert.equal(w.listeners('close').length, 0);

0 commit comments

Comments
 (0)