0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2025-01-20 22:52:46 -05:00

uploading tarballs bugfixes

This commit is contained in:
Alex Kocharin 2013-09-28 16:19:40 +04:00
parent f92a839a7f
commit 8c5628bfae
4 changed files with 133 additions and 8 deletions

View file

@ -40,14 +40,20 @@ function write(dest, data, cb) {
function write_stream(name) { function write_stream(name) {
var stream = new mystreams.UploadTarballStream(); var stream = new mystreams.UploadTarballStream();
var _ended = 0;
stream.on('end', function() {
_ended = 1;
});
fs.exists(name, function(exists) { fs.exists(name, function(exists) {
if (exists) return stream.emit('error', new FSError('EEXISTS')); if (exists) return stream.emit('error', new FSError('EEXISTS'));
var tmpname = name + '.tmp-'+String(Math.random()).replace(/^0\./, ''); var tmpname = name + '.tmp-'+String(Math.random()).replace(/^0\./, '');
var file = fs.createWriteStream(tmpname); var file = fs.createWriteStream(tmpname);
stream.pipe(file); stream.pipe(file);
stream.done = function() { stream.done = function() {
stream.on('end', function() { function onend() {
file.on('close', function() { file.on('close', function() {
fs.rename(tmpname, name, function(err) { fs.rename(tmpname, name, function(err) {
if (err) stream.emit('error', err); if (err) stream.emit('error', err);
@ -55,18 +61,26 @@ function write_stream(name) {
}); });
}); });
file.destroySoon(); file.destroySoon();
}); }
if (_ended) {
onend();
} else {
stream.on('end', onend);
}
}; };
stream.abort = function() { stream.abort = function() {
file.on('close', function() { file.on('close', function() {
fs.unlink(tmpname); fs.unlink(tmpname);
}); });
file.destroy(); file.destroySoon();
}; };
file.on('open', function() { file.on('open', function() {
// re-emitting open because it's handled in storage.js // re-emitting open because it's handled in storage.js
stream.emit('open'); stream.emit('open');
}); });
file.on('error', function(err) {
stream.emit('error', err);
});
}); });
return stream; return stream;
} }

View file

@ -136,6 +136,12 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback)
Storage.prototype.add_tarball = function(name, filename) { Storage.prototype.add_tarball = function(name, filename) {
var stream = new mystreams.UploadTarballStream(); var stream = new mystreams.UploadTarballStream();
var _transform = stream._transform;
var length = 0;
stream._transform = function(data) {
length += data.length;
_transform.apply(stream, arguments);
};
var self = this; var self = this;
if (name === info_file || name === '__proto__') { if (name === info_file || name === '__proto__') {
@ -153,6 +159,15 @@ Storage.prototype.add_tarball = function(name, filename) {
status: 409, status: 409,
msg: 'this tarball is already present' msg: 'this tarball is already present'
})); }));
} else if (err.code === 'ENOENT') {
// check if package exists to throw an appropriate message
self.get_package(name, function(_err, res) {
if (_err) {
stream.emit('error', _err);
} else {
stream.emit('error', err);
}
});
} else { } else {
stream.emit('error', err); stream.emit('error', err);
} }
@ -170,7 +185,15 @@ Storage.prototype.add_tarball = function(name, filename) {
wstream.abort(); wstream.abort();
}; };
stream.done = function() { stream.done = function() {
if (!length) {
stream.emit('error', new UError({
status: 422,
msg: 'refusing to accept zero-length file'
}));
wstream.abort();
} else {
wstream.done(); wstream.done();
}
}; };
stream.pipe(wstream); stream.pipe(wstream);

View file

@ -132,14 +132,69 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback)
} }
// //
// Upload a tarball to a storage for {name} package // Upload a tarball for {name} package
// //
// Function is syncronous and returns a WritableStream // Function is syncronous and returns a WritableStream
// //
// Used storages: local // Function uploads a tarball to all uplinks with write access and to
// local storage in parallel with a speed of a slowest pipe. It reports
// success if all uploads succeed.
//
// Used storages: local (write) && uplinks (proxy_publish, write)
// //
Storage.prototype.add_tarball = function(name, filename) { Storage.prototype.add_tarball = function(name, filename) {
return this.local.add_tarball(name, filename); var stream = new mystreams.UploadTarballStream();
var self = this;
var upstreams = [];
upstreams.push(self.local.add_tarball(name, filename));
for (var i in self.uplinks) {
if (self.config.proxy_publish(name, i)) {
upstreams.push(self.uplinks[i].add_tarball(name, filename));
}
}
function bail(err) {
upstreams.forEach(function(upstream) {
upstream.abort();
});
}
upstreams.forEach(function(upstream) {
stream.pipe(upstream);
upstream.on('error', function(err) {
if (err.code === 'EEXISTS') {
stream.emit('error', new UError({
status: 409,
msg: 'this tarball is already present'
}));
} else {
stream.emit('error', err);
}
bail(err);
});
upstream.on('success', function() {
upstream._sinopia_success = true;
if (upstreams.filter(function(upstream) {
return !upstream._sinopia_success;
}).length == 0) {
stream.emit('success');
}
});
});
stream.abort = function() {
bail();
};
stream.done = function() {
upstreams.forEach(function(upstream) {
upstream.done();
});
};
return stream;
} }
// //

View file

@ -74,7 +74,40 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback)
} }
Storage.prototype.add_tarball = function(name, filename) { Storage.prototype.add_tarball = function(name, filename) {
throw new Error('unimplemented'); var stream = new mystreams.UploadTarballStream();
var self = this;
var wstream = request({
uri: this.config.url + '/' + escape(name) + '/-/' + escape(filename) + '/whatever',
method: 'PUT',
headers: {
'User-Agent': this.ua,
'content-type': 'application/octet-stream'
},
ca: this.ca,
});
wstream.on('response', function(res) {
if (!(res.statusCode >= 200 && res.statusCode < 300)) {
return stream.emit('error', new UError({
msg: 'bad uplink status code: ' + res.statusCode,
status: 500,
}));
}
stream.emit('success');
});
wstream.on('error', function(err) {
stream.emit('error', err);
});
stream.abort = function() {
wstream.req.abort();
};
stream.done = function() {};
stream.pipe(wstream);
return stream;
} }
Storage.prototype.get_package = function(name, callback) { Storage.prototype.get_package = function(name, callback) {