0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2024-12-16 21:56:25 -05:00

working on streams

This commit is contained in:
Alex Kocharin 2013-06-20 17:07:34 +04:00
parent ad29d7fac7
commit d7eb7c9ef8
6 changed files with 158 additions and 62 deletions

View file

@ -46,6 +46,16 @@ var UserError = function(params, constr) {
util.inherits(UserError, Error); util.inherits(UserError, Error);
UserError.prototype.name = 'User Error'; UserError.prototype.name = 'User Error';
/*
* Mimic filesystem errors
*/
var FSError = function(code) {
this.code = code;
};
util.inherits(UserError, Error);
UserError.prototype.name = 'FS Error';
module.exports.AppError = AppError; module.exports.AppError = AppError;
module.exports.UserError = UserError; module.exports.UserError = UserError;
module.exports.FSError = FSError;

View file

@ -1,5 +1,7 @@
var fs = require('fs'); var fs = require('fs');
var Path = require('path'); var Path = require('path');
var through = require('through');
var FSError = require('./error').FSError;
function make_directories(dest, cb) { function make_directories(dest, cb) {
var dir = Path.dirname(dest); var dir = Path.dirname(dest);
@ -31,16 +33,47 @@ function write(dest, data, cb) {
}); });
} }
function write_stream(name) {
var stream = through(function(data) {
this.queue(data);
}, function() {
this.queue(null);
});
stream.pause();
fs.exists(name, function(exists) {
if (exists) return stream.emit('error', new FSError('EEXISTS'));
var tmpname = name + '.tmp-'+String(Math.random()).replace(/^0\./, '');
var file = fs.createWriteStream(tmpname);
stream.on('data', function(data) {
file.write(data);
});
stream.on('end', function() {
fs.rename(tmpname, name, function(err) {
if (err) stream.emit('error', err);
stream.emit('close');
});
});
stream.resume();
});
return stream;
}
function read_stream(name, stream, callback) {
return fs.createReadStream(name);
}
function create(name, contents, callback) { function create(name, contents, callback) {
fs.exists(name, function(exists) { fs.exists(name, function(exists) {
if (exists) return callback(new Error({code: 'EEXISTS'})); if (exists) return callback(new FSError('EEXISTS'));
write(name, contents, callback); write(name, contents, callback);
}); });
} }
function update(name, contents, callback) { function update(name, contents, callback) {
fs.exists(name, function(exists) { fs.exists(name, function(exists) {
if (!exists) return callback(new Error({code: 'ENOENT'})); if (!exists) return callback(new FSError('ENOENT'));
write(name, contents, callback); write(name, contents, callback);
}); });
} }
@ -75,7 +108,7 @@ Storage.prototype.create = function(name, value, cb) {
} }
Storage.prototype.create_json = function(name, value, cb) { Storage.prototype.create_json = function(name, value, cb) {
create(this.path + '/' + name, JSON.stringify(value), cb); create(this.path + '/' + name, JSON.stringify(value, null, '\t'), cb);
} }
Storage.prototype.update = function(name, value, cb) { Storage.prototype.update = function(name, value, cb) {
@ -83,7 +116,7 @@ Storage.prototype.update = function(name, value, cb) {
} }
Storage.prototype.update_json = function(name, value, cb) { Storage.prototype.update_json = function(name, value, cb) {
update(this.path + '/' + name, JSON.stringify(value), cb); update(this.path + '/' + name, JSON.stringify(value, null, '\t'), cb);
} }
Storage.prototype.write = function(name, value, cb) { Storage.prototype.write = function(name, value, cb) {
@ -94,5 +127,13 @@ Storage.prototype.write_json = function(name, value, cb) {
write(this.path + '/' + name, JSON.stringify(value, null, '\t'), cb); write(this.path + '/' + name, JSON.stringify(value, null, '\t'), cb);
} }
Storage.prototype.write_stream = function(name, value, cb) {
return write_stream(this.path + '/' + name, value, cb);
}
Storage.prototype.read_stream = function(name, cb) {
return read_stream(this.path + '/' + name, cb);
}
module.exports = Storage; module.exports = Storage;

View file

@ -78,17 +78,13 @@ module.exports = function(config_hash) {
}); });
app.get('/:package/-/:filename', can('access'), function(req, res, next) { app.get('/:package/-/:filename', can('access'), function(req, res, next) {
storage.get_tarball(req.params.package, req.params.filename, function(err, stream) { var stream = storage.get_tarball(req.params.package, req.params.filename);
if (err) return next(err); stream.on('error', function(err) {
if (!stream) { return next(err);
return next(new UError({
status: 404,
msg: 'package not found'
}));
}
res.header('content-type', 'application/octet-stream');
res.send(stream);
}); });
res.header('content-type', 'application/octet-stream');
stream.on('data', console.log);// rstream.pipe(stream);
stream.pipe(res);
}); });
//app.get('/*', function(req, res) { //app.get('/*', function(req, res) {
@ -157,8 +153,12 @@ module.exports = function(config_hash) {
app.put('/:package/-/:filename/*', can('publish'), media('application/octet-stream'), function(req, res, next) { app.put('/:package/-/:filename/*', can('publish'), media('application/octet-stream'), function(req, res, next) {
var name = req.params.package; var name = req.params.package;
storage.add_tarball(name, req.params.filename, req, function(err) { var stream = storage.add_tarball(name, req.params.filename);
if (err) return next(err); req.pipe(stream);
stream.on('error', function(err) {
return next(err);
});
stream.on('end', function() {
res.status(201); res.status(201);
return res.send({ return res.send({
ok: 'tarball uploaded successfully' ok: 'tarball uploaded successfully'

View file

@ -1,5 +1,6 @@
var fs = require('fs'); var fs = require('fs');
var semver = require('semver'); var semver = require('semver');
var through = require('through');
var fs_storage = require('./fs-storage'); var fs_storage = require('./fs-storage');
var UError = require('./error').UserError; var UError = require('./error').UserError;
var utils = require('./utils'); var utils = require('./utils');
@ -27,7 +28,7 @@ function get_boilerplate(name) {
} }
Storage.prototype.add_package = function(name, metadata, callback) { Storage.prototype.add_package = function(name, metadata, callback) {
this.storage.create_json(name + '/' + info_file, get_boilerplace(name), function(err) { this.storage.create_json(name + '/' + info_file, get_boilerplate(name), function(err) {
if (err && err.code === 'EEXISTS') { if (err && err.code === 'EEXISTS') {
return callback(new UError({ return callback(new UError({
status: 409, status: 409,
@ -92,7 +93,7 @@ Storage.prototype.update_versions = function(name, newdata, callback) {
// if tag is updated to reference latter version, that's fine // if tag is updated to reference latter version, that's fine
var need_change = var need_change =
(data['dist-tags'][tag] == null) || (data['dist-tags'][tag] == null) ||
(!semver.gt(newdata['dist-tags'], data['dist-tags'][tag])); (!semver.gt(newdata['dist-tags'][tag], data['dist-tags'][tag]));
if (need_change) { if (need_change) {
change = true; change = true;
@ -128,49 +129,67 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback)
}); });
} }
Storage.prototype.add_tarball = function(name, filename, stream, callback) { Storage.prototype.add_tarball = function(name, filename) {
var stream = through(function(data) {
wstream.write(data);
}, function() {
wstream.end();
});
var self = this; var self = this;
if (name === info_file) { if (name === info_file) {
return callback(new UError({ stream.emit('error', new UError({
status: 403, status: 403,
msg: 'can\'t use this filename' msg: 'can\'t use this filename'
})); }));
} }
var data = new Buffer(0); var wstream = this.storage.write_stream(name + '/' + filename);
stream.on('data', function(d) {
var tmp = data; wstream.on('error', function(err) {
data = new Buffer(tmp.length+d.length); if (err.code === 'EEXISTS') {
tmp.copy(data, 0); stream.emit('error', new UError({
d.copy(data, tmp.length); status: 409,
msg: 'this tarball is already present'
}));
} else {
stream.emit('error', err);
}
}); });
stream.on('end', function(d) {
self.storage.create(name + '/' + filename, data, function(err) { wstream.on('close', function() {
if (err && err.code === 'EEXISTS') { stream.emit('end');
return callback(new UError({
status: 409,
msg: 'this tarball is already present'
}));
}
callback.apply(null, arguments);
});
}); });
return stream;
} }
Storage.prototype.get_tarball = function(name, filename, callback) { Storage.prototype.get_tarball = function(name, filename, callback) {
this.storage.read(name + '/' + filename, function(err) { var stream = through(function(data) {
if (err && err.code === 'ENOENT') { this.queue(data);
return callback(new UError({ }, function() {
status: 404, this.queue(null);
msg: 'no such package available'
}));
}
callback.apply(null, arguments);
}); });
var rstream = this.storage.read_stream(name + '/' + filename);
rstream.on('error', function(err) {
if (err && err.code === 'ENOENT') {
stream.emit('error', new UError({
status: 404,
msg: 'no such file available',
}));
} else {
stream.emit('error', err);
}
});
rstream.on('open', function() {
stream.emit('open');
rstream.pipe(stream);
});
return stream;
} }
Storage.prototype.get_package = function(name, callback) { Storage.prototype.get_package = function(name, callback) {
this.storage.read_json(name + '/' + info_file, function(err) { this.storage.read_json(name + '/' + info_file, function(err, result) {
if (err && err.code === 'ENOENT') { if (err && err.code === 'ENOENT') {
return callback(new UError({ return callback(new UError({
status: 404, status: 404,

View file

@ -1,5 +1,6 @@
var async = require('async'); var async = require('async');
var semver = require('semver'); var semver = require('semver');
var through = require('through');
var UError = require('./error').UserError; var UError = require('./error').UserError;
var Local = require('./st-local'); var Local = require('./st-local');
var Proxy = require('./st-proxy'); var Proxy = require('./st-proxy');
@ -62,30 +63,40 @@ Storage.prototype.add_version = function(name, version, metadata, tag, callback)
this.local.add_version(name, version, metadata, tag, callback); this.local.add_version(name, version, metadata, tag, callback);
} }
Storage.prototype.add_tarball = function(name, filename, stream, callback) { Storage.prototype.add_tarball = function(name, filename) {
this.local.add_tarball(name, filename, stream, callback); return this.local.add_tarball(name, filename);
} }
Storage.prototype.get_tarball = function(name, filename, callback) { Storage.prototype.get_tarball = function(name, filename, callback) {
var stream = through(function(data) {
this.queue(data);
}, function() {
this.queue(null);
});
var self = this; var self = this;
// if someone requesting tarball, it means that we should already have some // if someone requesting tarball, it means that we should already have some
// information about it, so fetching package info is unnecessary // information about it, so fetching package info is unnecessary
// trying local first // trying local first
self.local.get_tarball(name, filename, function(err, results) { var rstream = self.local.get_tarball(name, filename);
if (err && err.status !== 404) return callback(err); var is_open = false;
if (!err && results != null) return callback(err, results); rstream.on('error', function(err) {
if (is_open || err.status !== 404) {
var uplink = null; return stream.emit('error', err);
var err404 = err; }
// local reported 404 // local reported 404
var err404 = err;
var uplink = null;
rstream.destroy();
self.local.get_package(name, function(err, info) { self.local.get_package(name, function(err, info) {
if (err) return callback(err); if (err) return stream.emit('error', err);
if (info._distfiles[filename] == null) { if (info._distfiles[filename] == null) {
return callback(err404); return stream.emit('error', err404);
} }
var file = info._distfiles[filename]; var file = info._distfiles[filename];
@ -102,13 +113,23 @@ Storage.prototype.get_tarball = function(name, filename, callback) {
}, self.config); }, self.config);
} }
uplink.get_tarball(name, filename, function(err, res) { var rstream2 = uplink.get_tarball(name, filename);
if (err) return callback(err); rstream2.on('error', function(err) {
stream.emit('error', err);
return callback(err, res); });
rstream2.on('data', function(data) {
stream.write(data);
});
rstream2.on('end', function() {
stream.end();
}); });
}); });
}); });
rstream.on('open', function() {
is_open = true;
rstream.pipe(stream);
});
return stream;
} }
Storage.prototype.get_package = function(name, callback) { Storage.prototype.get_package = function(name, callback) {
@ -132,6 +153,11 @@ Storage.prototype.get_package = function(name, callback) {
up.get_package(name, function(err, up_res) { up.get_package(name, function(err, up_res) {
if (err) return cb(); if (err) return cb();
if (up === self.local) {
// file exists in local repo
exists = true;
}
try { try {
utils.validate_metadata(up_res, name); utils.validate_metadata(up_res, name);
} catch(err) { } catch(err) {
@ -139,7 +165,8 @@ Storage.prototype.get_package = function(name, callback) {
} }
var this_version = up_res['dist-tags'].latest; var this_version = up_res['dist-tags'].latest;
if (!semver.gt(latest, this_version) && this_version) { if (latest == null
|| (!semver.gt(latest, this_version) && this_version)) {
latest = this_version; latest = this_version;
var is_latest = true; var is_latest = true;
} }

View file

@ -22,7 +22,6 @@ function is_object(obj) {
module.exports.validate_metadata = function(object, name) { module.exports.validate_metadata = function(object, name) {
assert(is_object(object)); assert(is_object(object));
assert.equal(object._id, name);
assert.equal(object.name, name); assert.equal(object.name, name);
if (!is_object(object['dist-tags'])) { if (!is_object(object['dist-tags'])) {