0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2024-12-16 21:56:25 -05:00

through -> streams2 migrate - final

This commit is contained in:
Alex Kocharin 2013-09-27 15:31:28 +04:00
parent 1570cc348c
commit 5dbc825892
9 changed files with 55 additions and 35 deletions

View file

@ -164,17 +164,18 @@ module.exports = function(config_hash) {
var complete = false; var complete = false;
req.on('end', function() { req.on('end', function() {
complete = true; complete = true;
stream.done();
}); });
req.on('close', function() { req.on('close', function() {
if (!complete) { if (!complete) {
stream.emit('abort'); stream.abort();
} }
}); });
stream.on('error', function(err) { stream.on('error', function(err) {
return next(err); return next(err);
}); });
stream.on('end', function() { stream.on('success', function() {
res.status(201); res.status(201);
return res.send({ return res.send({
ok: 'tarball uploaded successfully' ok: 'tarball uploaded successfully'

View file

@ -47,13 +47,15 @@ function write_stream(name) {
var file = fs.createWriteStream(tmpname); var file = fs.createWriteStream(tmpname);
stream.pipe(file); stream.pipe(file);
stream.done = function() { stream.done = function() {
file.on('close', function() { stream.on('end', function() {
fs.rename(tmpname, name, function(err) { file.on('close', function() {
if (err) stream.emit('error', err); fs.rename(tmpname, name, function(err) {
stream.emit('close'); if (err) stream.emit('error', err);
stream.emit('success');
});
}); });
file.destroySoon();
}); });
file.destroySoon();
}; };
stream.abort = function() { stream.abort = function() {
file.on('close', function() { file.on('close', function() {

View file

@ -1,7 +1,6 @@
var fs = require('fs'); var fs = require('fs');
var semver = require('semver'); var semver = require('semver');
var Path = require('path'); var Path = require('path');
var through = require('through');
var fs_storage = require('./local-fs'); var fs_storage = require('./local-fs');
var UError = require('./error').UserError; var UError = require('./error').UserError;
var utils = require('./utils'); var utils = require('./utils');
@ -136,14 +135,10 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback)
} }
Storage.prototype.add_tarball = function(name, filename) { Storage.prototype.add_tarball = function(name, filename) {
var stream = through(function(data) { var stream = new mystreams.UploadTarballStream();
wstream.write(data);
}, function() {
wstream.end();
});
var self = this; var self = this;
if (name === info_file) { if (name === info_file || name === '__proto__') {
stream.emit('error', new UError({ stream.emit('error', new UError({
status: 403, status: 403,
msg: 'can\'t use this filename' msg: 'can\'t use this filename'
@ -167,12 +162,17 @@ Storage.prototype.add_tarball = function(name, filename) {
// re-emitting open because it's handled in storage.js // re-emitting open because it's handled in storage.js
stream.emit('open'); stream.emit('open');
}); });
wstream.on('close', function() { wstream.on('success', function() {
stream.emit('end'); // re-emitting open because it's handled in index.js
}); stream.emit('success');
stream.on('abort', function() {
wstream.emit('abort');
}); });
stream.abort = function() {
wstream.abort();
};
stream.done = function() {
wstream.done();
};
stream.pipe(wstream);
return stream; return stream;
} }

View file

@ -1,6 +1,5 @@
var async = require('async'); var async = require('async');
var semver = require('semver'); var semver = require('semver');
var through = require('through');
var UError = require('./error').UserError; var UError = require('./error').UserError;
var Local = require('./local-storage'); var Local = require('./local-storage');
var Proxy = require('./up-storage'); var Proxy = require('./up-storage');
@ -144,13 +143,10 @@ Storage.prototype.get_tarball = function(name, filename) {
var savestream = self.local.add_tarball(name, filename); var savestream = self.local.add_tarball(name, filename);
savestream.on('error', function(err) { savestream.on('error', function(err) {
console.log('xx!', name, filename);
savestream.abort(); savestream.abort();
stream.emit('error', err); stream.emit('error', err);
}); });
console.log('!', name, filename);
savestream.on('open', function() { savestream.on('open', function() {
console.log('x!', name, filename);
var rstream2 = uplink.get_url(file.url); var rstream2 = uplink.get_url(file.url);
rstream2.on('error', function(err) { rstream2.on('error', function(err) {
stream.emit('error', err); stream.emit('error', err);

View file

@ -18,7 +18,7 @@ module.exports.ReadTarballStream = ReadTarball;
// This stream is used to upload tarballs to a repository // This stream is used to upload tarballs to a repository
// //
function UploadTarball(options) { function UploadTarball(options) {
stream.Writable.call(this, options); stream.PassThrough.call(this, options);
// called when user closes connection before upload finishes // called when user closes connection before upload finishes
add_abstract_method(this, 'abort'); add_abstract_method(this, 'abort');
@ -27,7 +27,7 @@ function UploadTarball(options) {
add_abstract_method(this, 'done'); add_abstract_method(this, 'done');
} }
util.inherits(UploadTarball, stream.Writable); util.inherits(UploadTarball, stream.PassThrough);
module.exports.UploadTarballStream = UploadTarball; module.exports.UploadTarballStream = UploadTarball;
// //
@ -51,7 +51,7 @@ function add_abstract_method(self, name) {
}); });
} }
module.exports.__test = function() { function __test() {
var test = new ReadTarball(); var test = new ReadTarball();
test.abort(); test.abort();
setTimeout(function() { setTimeout(function() {

View file

@ -1,6 +1,5 @@
var URL = require('url'); var URL = require('url');
var request = require('request'); var request = require('request');
var through = require('through');
var UError = require('./error').UserError; var UError = require('./error').UserError;
var mystreams = require('./streams'); var mystreams = require('./streams');

View file

@ -25,7 +25,6 @@ dependencies:
async: '*' async: '*'
semver: '*' semver: '*'
minimatch: '*' minimatch: '*'
through: '*'
devDependencies: devDependencies:
rimraf: '*' rimraf: '*'

View file

@ -15,16 +15,14 @@ function prep(cb) {
} }
Server.prototype.request = function(options, cb) { Server.prototype.request = function(options, cb) {
options.headers = options.headers || {}; var headers = options.headers || {};
headers.accept = headers.accept || 'application/json';
headers['user-agent'] = headers['user-agent'] || this.userAgent;
headers.authorization = headers.authorization || this.auth;
return request({ return request({
url: this.url + options.uri, url: this.url + options.uri,
method: options.method || 'GET', method: options.method || 'GET',
headers: { headers: headers,
accept: options.headers.accept || 'application/json',
'user-agent': options.headers['user-agent'] || this.userAgent,
'content-type': options.headers['content-type'],
authorization: this.auth,
},
json: options.json || true, json: options.json || true,
}, cb); }, cb);
} }
@ -86,5 +84,24 @@ Server.prototype.put_tarball = function(name, filename, data, cb) {
}, prep(cb)).end(data); }, prep(cb)).end(data);
} }
Server.prototype.put_tarball_incomplete = function(name, filename, data, size, cb) {
var req = this.request({
uri: '/'+escape(name)+'/-/'+escape(filename)+'/whatever',
method: 'PUT',
headers: {
'content-type': 'application/octet-stream',
'content-length': size,
},
timeout: 1000,
}, function(err) {
assert(err);
cb();
});
req.write(data);
setTimeout(function() {
req.req.abort();
}, 20);
}
module.exports = Server; module.exports = Server;

View file

@ -67,6 +67,12 @@ ex['downloading non-existent tarball'] = function(cb) {
}); });
}; };
ex['uploading incomplete tarball'] = function(cb) {
server.put_tarball_incomplete('testpkg', 'blahblah1', readfile('fixtures/binary'), 3000, function(res, body) {
cb();
});
};
ex['uploading new tarball'] = function(cb) { ex['uploading new tarball'] = function(cb) {
server.put_tarball('testpkg', 'blahblah', readfile('fixtures/binary'), function(res, body) { server.put_tarball('testpkg', 'blahblah', readfile('fixtures/binary'), function(res, body) {
assert(res.statusCode === 201); assert(res.statusCode === 201);