From 8c5628bfae6cc41de4a078904c858795330263f9 Mon Sep 17 00:00:00 2001 From: Alex Kocharin Date: Sat, 28 Sep 2013 16:19:40 +0400 Subject: [PATCH] uploading tarballs bugfixes --- lib/local-fs.js | 20 ++++++++++++--- lib/local-storage.js | 25 +++++++++++++++++- lib/storage.js | 61 +++++++++++++++++++++++++++++++++++++++++--- lib/up-storage.js | 35 ++++++++++++++++++++++++- 4 files changed, 133 insertions(+), 8 deletions(-) diff --git a/lib/local-fs.js b/lib/local-fs.js index a51f45f97..efb748aa4 100644 --- a/lib/local-fs.js +++ b/lib/local-fs.js @@ -40,14 +40,20 @@ function write(dest, data, cb) { function write_stream(name) { var stream = new mystreams.UploadTarballStream(); + var _ended = 0; + stream.on('end', function() { + _ended = 1; + }); + fs.exists(name, function(exists) { if (exists) return stream.emit('error', new FSError('EEXISTS')); var tmpname = name + '.tmp-'+String(Math.random()).replace(/^0\./, ''); var file = fs.createWriteStream(tmpname); stream.pipe(file); + stream.done = function() { - stream.on('end', function() { + function onend() { file.on('close', function() { fs.rename(tmpname, name, function(err) { if (err) stream.emit('error', err); @@ -55,18 +61,26 @@ function write_stream(name) { }); }); file.destroySoon(); - }); + } + if (_ended) { + onend(); + } else { + stream.on('end', onend); + } }; stream.abort = function() { file.on('close', function() { fs.unlink(tmpname); }); - file.destroy(); + file.destroySoon(); }; file.on('open', function() { // re-emitting open because it's handled in storage.js stream.emit('open'); }); + file.on('error', function(err) { + stream.emit('error', err); + }); }); return stream; } diff --git a/lib/local-storage.js b/lib/local-storage.js index ed50241d6..89904d964 100644 --- a/lib/local-storage.js +++ b/lib/local-storage.js @@ -136,6 +136,12 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback) Storage.prototype.add_tarball = function(name, filename) { var stream = new mystreams.UploadTarballStream(); + var _transform = stream._transform; + var length = 0; + stream._transform = function(data) { + length += data.length; + _transform.apply(stream, arguments); + }; var self = this; if (name === info_file || name === '__proto__') { @@ -153,6 +159,15 @@ Storage.prototype.add_tarball = function(name, filename) { status: 409, msg: 'this tarball is already present' })); + } else if (err.code === 'ENOENT') { + // check if package exists to throw an appropriate message + self.get_package(name, function(_err, res) { + if (_err) { + stream.emit('error', _err); + } else { + stream.emit('error', err); + } + }); } else { stream.emit('error', err); } @@ -170,7 +185,15 @@ Storage.prototype.add_tarball = function(name, filename) { wstream.abort(); }; stream.done = function() { - wstream.done(); + if (!length) { + stream.emit('error', new UError({ + status: 422, + msg: 'refusing to accept zero-length file' + })); + wstream.abort(); + } else { + wstream.done(); + } }; stream.pipe(wstream); diff --git a/lib/storage.js b/lib/storage.js index 6893138d0..0fc97fad0 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -132,14 +132,69 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback) } // -// Upload a tarball to a storage for {name} package +// Upload a tarball for {name} package // // Function is syncronous and returns a WritableStream // -// Used storages: local +// Function uploads a tarball to all uplinks with write access and to +// local storage in parallel with a speed of a slowest pipe. It reports +// success if all uploads succeed. +// +// Used storages: local (write) && uplinks (proxy_publish, write) // Storage.prototype.add_tarball = function(name, filename) { - return this.local.add_tarball(name, filename); + var stream = new mystreams.UploadTarballStream(); + + var self = this; + var upstreams = []; + + upstreams.push(self.local.add_tarball(name, filename)); + for (var i in self.uplinks) { + if (self.config.proxy_publish(name, i)) { + upstreams.push(self.uplinks[i].add_tarball(name, filename)); + } + } + + function bail(err) { + upstreams.forEach(function(upstream) { + upstream.abort(); + }); + } + + upstreams.forEach(function(upstream) { + stream.pipe(upstream); + + upstream.on('error', function(err) { + if (err.code === 'EEXISTS') { + stream.emit('error', new UError({ + status: 409, + msg: 'this tarball is already present' + })); + } else { + stream.emit('error', err); + } + bail(err); + }); + upstream.on('success', function() { + upstream._sinopia_success = true; + if (upstreams.filter(function(upstream) { + return !upstream._sinopia_success; + }).length == 0) { + stream.emit('success'); + } + }); + }); + + stream.abort = function() { + bail(); + }; + stream.done = function() { + upstreams.forEach(function(upstream) { + upstream.done(); + }); + }; + + return stream; } // diff --git a/lib/up-storage.js b/lib/up-storage.js index 658880302..e5e2a6ec4 100644 --- a/lib/up-storage.js +++ b/lib/up-storage.js @@ -74,7 +74,40 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback) } Storage.prototype.add_tarball = function(name, filename) { - throw new Error('unimplemented'); + var stream = new mystreams.UploadTarballStream(); + var self = this; + + var wstream = request({ + uri: this.config.url + '/' + escape(name) + '/-/' + escape(filename) + '/whatever', + method: 'PUT', + headers: { + 'User-Agent': this.ua, + 'content-type': 'application/octet-stream' + }, + ca: this.ca, + }); + + wstream.on('response', function(res) { + if (!(res.statusCode >= 200 && res.statusCode < 300)) { + return stream.emit('error', new UError({ + msg: 'bad uplink status code: ' + res.statusCode, + status: 500, + })); + } + stream.emit('success'); + }); + + wstream.on('error', function(err) { + stream.emit('error', err); + }); + + stream.abort = function() { + wstream.req.abort(); + }; + stream.done = function() {}; + stream.pipe(wstream); + + return stream; } Storage.prototype.get_package = function(name, callback) {