From 1570cc348c84ad10c0984d193bf3af97b48f0efc Mon Sep 17 00:00:00 2001 From: Alex Kocharin Date: Fri, 27 Sep 2013 13:54:43 +0400 Subject: [PATCH] through -> streams2 transition for local-fs --- lib/local-fs.js | 28 +++++++++--------------- lib/streams.js | 58 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 33 deletions(-) diff --git a/lib/local-fs.js b/lib/local-fs.js index 5e75bc478..0b7b10dba 100644 --- a/lib/local-fs.js +++ b/lib/local-fs.js @@ -1,6 +1,6 @@ var fs = require('fs'); var Path = require('path'); -var through = require('through'); +var mystreams = require('./streams'); var FSError = require('./error').FSError; function make_directories(dest, cb) { @@ -38,22 +38,15 @@ function write(dest, data, cb) { } function write_stream(name) { - var stream = through(function(data) { - this.queue(data); - }, function() { - this.queue(null); - }, {autoDestroy: false}); - stream.pause(); + var stream = new mystreams.UploadTarballStream(); fs.exists(name, function(exists) { if (exists) return stream.emit('error', new FSError('EEXISTS')); var tmpname = name + '.tmp-'+String(Math.random()).replace(/^0\./, ''); var file = fs.createWriteStream(tmpname); - stream.on('data', function(data) { - file.write(data); - }); - stream.on('end', function() { + stream.pipe(file); + stream.done = function() { file.on('close', function() { fs.rename(tmpname, name, function(err) { if (err) stream.emit('error', err); @@ -61,18 +54,17 @@ function write_stream(name) { }); }); file.destroySoon(); - }); - file.on('open', function() { - // re-emitting open because it's handled in storage.js - stream.emit('open'); - }); - stream.on('abort', function() { + }; + stream.abort = function() { file.on('close', function() { fs.unlink(tmpname); }); file.destroy(); + }; + file.on('open', function() { + // re-emitting open because it's handled in storage.js + stream.emit('open'); }); - stream.resume(); }); return stream; } diff --git a/lib/streams.js b/lib/streams.js index 1115771c7..330c00043 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -6,12 +6,10 @@ var util = require('util'); // function ReadTarball(options) { stream.PassThrough.call(this, options); -} -// called when data is not needed anymore -UploadTarball.prototype.abort = function() { - this.emit('error', new Error('not implemented')); -}; + // called when data is not needed anymore + add_abstract_method(this, 'abort'); +} util.inherits(ReadTarball, stream.PassThrough); module.exports.ReadTarballStream = ReadTarball; @@ -21,18 +19,48 @@ module.exports.ReadTarballStream = ReadTarball; // function UploadTarball(options) { stream.Writable.call(this, options); + + // called when user closes connection before upload finishes + add_abstract_method(this, 'abort'); + + // called when upload finishes successfully + add_abstract_method(this, 'done'); } -// called when user closes connection before upload finishes -UploadTarball.prototype.abort = function() { - this.emit('error', new Error('not implemented')); -}; - -// called when upload finishes successfully -UploadTarball.prototype.done = function() { - this.emit('error', new Error('not implemented')); -}; - util.inherits(UploadTarball, stream.Writable); module.exports.UploadTarballStream = UploadTarball; +// +// This function intercepts abstract calls and replays them allowing +// us to attach those functions after we are ready to do so +// +function add_abstract_method(self, name) { + self._called_methods = self._called_methods || {}; + self.__defineGetter__(name, function() { + return function() { + self._called_methods[name] = true; + } + }); + self.__defineSetter__(name, function(fn) { + delete self[name]; + self[name] = fn; + if (self._called_methods && self._called_methods[name]) { + delete self._called_methods[name]; + self[name](); + } + }); +} + +module.exports.__test = function() { + var test = new ReadTarball(); + test.abort(); + setTimeout(function() { + test.abort = function() { + console.log('ok'); + }; + test.abort = function() { + throw 'fail'; + }; + }, 100); +} +