0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2024-12-30 22:34:10 -05:00

through -> streams2 transition for local-fs

This commit is contained in:
Alex Kocharin 2013-09-27 13:54:43 +04:00
parent ac2ea00b2b
commit 1570cc348c
2 changed files with 53 additions and 33 deletions

View file

@ -1,6 +1,6 @@
var fs = require('fs');
var Path = require('path');
var through = require('through');
var mystreams = require('./streams');
var FSError = require('./error').FSError;
function make_directories(dest, cb) {
@ -38,22 +38,15 @@ function write(dest, data, cb) {
}
function write_stream(name) {
var stream = through(function(data) {
this.queue(data);
}, function() {
this.queue(null);
}, {autoDestroy: false});
stream.pause();
var stream = new mystreams.UploadTarballStream();
fs.exists(name, function(exists) {
if (exists) return stream.emit('error', new FSError('EEXISTS'));
var tmpname = name + '.tmp-'+String(Math.random()).replace(/^0\./, '');
var file = fs.createWriteStream(tmpname);
stream.on('data', function(data) {
file.write(data);
});
stream.on('end', function() {
stream.pipe(file);
stream.done = function() {
file.on('close', function() {
fs.rename(tmpname, name, function(err) {
if (err) stream.emit('error', err);
@ -61,18 +54,17 @@ function write_stream(name) {
});
});
file.destroySoon();
});
file.on('open', function() {
// re-emitting open because it's handled in storage.js
stream.emit('open');
});
stream.on('abort', function() {
};
stream.abort = function() {
file.on('close', function() {
fs.unlink(tmpname);
});
file.destroy();
};
file.on('open', function() {
// re-emitting open because it's handled in storage.js
stream.emit('open');
});
stream.resume();
});
return stream;
}

View file

@ -6,12 +6,10 @@ var util = require('util');
//
function ReadTarball(options) {
stream.PassThrough.call(this, options);
}
// called when data is not needed anymore
UploadTarball.prototype.abort = function() {
this.emit('error', new Error('not implemented'));
};
add_abstract_method(this, 'abort');
}
util.inherits(ReadTarball, stream.PassThrough);
module.exports.ReadTarballStream = ReadTarball;
@ -21,18 +19,48 @@ module.exports.ReadTarballStream = ReadTarball;
//
function UploadTarball(options) {
stream.Writable.call(this, options);
}
// called when user closes connection before upload finishes
UploadTarball.prototype.abort = function() {
this.emit('error', new Error('not implemented'));
};
add_abstract_method(this, 'abort');
// called when upload finishes successfully
UploadTarball.prototype.done = function() {
this.emit('error', new Error('not implemented'));
};
add_abstract_method(this, 'done');
}
util.inherits(UploadTarball, stream.Writable);
module.exports.UploadTarballStream = UploadTarball;
//
// This function intercepts abstract calls and replays them allowing
// us to attach those functions after we are ready to do so
//
function add_abstract_method(self, name) {
self._called_methods = self._called_methods || {};
self.__defineGetter__(name, function() {
return function() {
self._called_methods[name] = true;
}
});
self.__defineSetter__(name, function(fn) {
delete self[name];
self[name] = fn;
if (self._called_methods && self._called_methods[name]) {
delete self._called_methods[name];
self[name]();
}
});
}
module.exports.__test = function() {
var test = new ReadTarball();
test.abort();
setTimeout(function() {
test.abort = function() {
console.log('ok');
};
test.abort = function() {
throw 'fail';
};
}, 100);
}