@@ -45,6 +45,8 @@ const {
4545 ERR_UNKNOWN_ENCODING
4646} = require('internal/errors').codes;
4747
48+ const { errorOrDestroy } = destroyImpl;
49+
4850util.inherits(Writable, Stream);
4951
5052function nop() {}
@@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
147149 // Should close be emitted on destroy. Defaults to true.
148150 this.emitClose = options.emitClose !== false;
149151
152+ // Should .destroy() be called after 'finish' (and potentially 'end')
153+ this.autoDestroy = !!options.autoDestroy;
154+
150155 // count buffered requests
151156 this.bufferedRequestCount = 0;
152157
@@ -235,14 +240,14 @@ function Writable(options) {
235240
236241// Otherwise people can pipe Writable streams, which is just wrong.
237242Writable.prototype.pipe = function() {
238- this.emit('error' , new ERR_STREAM_CANNOT_PIPE());
243+ errorOrDestroy(this , new ERR_STREAM_CANNOT_PIPE());
239244};
240245
241246
242247function writeAfterEnd(stream, cb) {
243248 var er = new ERR_STREAM_WRITE_AFTER_END();
244249 // TODO: defer error events consistently everywhere, not just the cb
245- stream.emit('error' , er);
250+ errorOrDestroy(stream , er);
246251 process.nextTick(cb, er);
247252}
248253
@@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
258263 er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
259264 }
260265 if (er) {
261- stream.emit('error' , er);
266+ errorOrDestroy(stream , er);
262267 process.nextTick(cb, er);
263268 return false;
264269 }
@@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
422427 // after error
423428 process.nextTick(finishMaybe, stream, state);
424429 stream._writableState.errorEmitted = true;
425- stream.emit('error' , er);
430+ errorOrDestroy(stream , er);
426431 } else {
427432 // the caller expect this to happen before if
428433 // it is async
429434 cb(er);
430435 stream._writableState.errorEmitted = true;
431- stream.emit('error' , er);
436+ errorOrDestroy(stream , er);
432437 // this can emit finish, but finish must
433438 // always follow error
434439 finishMaybe(stream, state);
@@ -612,7 +617,7 @@ function callFinal(stream, state) {
612617 stream._final((err) => {
613618 state.pendingcb--;
614619 if (err) {
615- stream.emit('error' , err);
620+ errorOrDestroy(stream , err);
616621 }
617622 state.prefinished = true;
618623 stream.emit('prefinish');
@@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
639644 if (state.pendingcb === 0) {
640645 state.finished = true;
641646 stream.emit('finish');
647+
648+ if (state.autoDestroy) {
649+ // In case of duplex streams we need a way to detect
650+ // if the readable side is ready for autoDestroy as well
651+ const rState = stream._readableState;
652+ if (!rState || (rState.autoDestroy && rState.endEmitted)) {
653+ stream.destroy();
654+ }
655+ }
642656 }
643657 }
644658 return need;
0 commit comments