0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2025-03-25 02:32:52 -05:00

#103 Refactoring Uplink Storage, Config and Storage class

This commit is contained in:
Juan Picado @jotadeveloper 2017-06-06 23:07:51 +02:00
parent 09ca08baaf
commit fcae1fa91d
No known key found for this signature in database
GPG key ID: 18AC54485952D158
8 changed files with 643 additions and 548 deletions

View file

@ -2,7 +2,7 @@
/* eslint prefer-spread: "off" */
'use strict';
const _ = require('lodash');
const assert = require('assert');
const Crypto = require('crypto');
const Error = require('http-errors');
@ -12,15 +12,18 @@ const pkginfo = require('pkginfo')(module); // eslint-disable-line no-unused-var
const pkgVersion = module.exports.version;
const pkgName = module.exports.name;
const integrityPackages = ['users', 'uplinks', 'packages'];
const integrityProxy = ['http_proxy', 'https_proxy', 'no_proxy'];
/**
* [[a, [b, c]], d] -> [a, b, c, d]
* @param {*} array
* @return {Array}
*/
function flatten(array) {
let result = [];
for (let i=0; i<array.length; i++) {
if (Array.isArray(array[i])) {
const result = [];
for (let i = 0; i < array.length; i++) {
if (_.isArray(array[i])) {
result.push.apply(result, flatten(array[i]));
} else {
result.push(array[i]);
@ -41,6 +44,22 @@ const parse_interval_table = {
'y': 365*86400000,
};
const users = {
'all': true,
'anonymous': true,
'undefined': true,
'owner': true,
'none': true,
};
const check_user_or_uplink = function(arg) {
assert(arg !== 'all' && arg !== 'owner'
&& arg !== 'anonymous' && arg !== 'undefined' && arg !== 'none', 'CONFIG: reserved user/uplink name: ' + arg);
assert(!arg.match(/\s/), 'CONFIG: invalid user name: ' + arg);
assert(users[arg] == null, 'CONFIG: duplicate user/uplink name: ' + arg);
users[arg] = true;
};
/**
* Parse an internal string to number
* @param {*} interval
@ -66,6 +85,29 @@ function parse_interval(interval) {
return result;
}
/**
* Normalise user list.
* @return {Array}
*/
function normalize_userlist() {
const result = [];
for (let i = 0; i<arguments.length; i++) {
if (_.isNil(arguments[i])) {
continue;
}
// if it's a string, split it to array
if (_.isString(arguments[i])) {
result.push(arguments[i].split(/\s+/));
} else if (_.isArray(arguments[i])) {
result.push(arguments[i]);
} else {
throw Error('CONFIG: bad package acl (array or string expected): ' + JSON.stringify(arguments[i]));
}
}
return flatten(result);
}
/**
* Coordinates the application configuration
*/
@ -73,51 +115,35 @@ class Config {
/**
* Constructor
* @param {*} config config the content
* @param {*} defaultConfig config the content
*/
constructor(config) {
constructor(defaultConfig) {
const self = this;
for (let i in config) {
if (self[i] == null) {
self[i] = config[i];
}
}
if (!self.user_agent) {
self.user_agent = `${pkgName}/${pkgVersion}`;
}
// some weird shell scripts are valid yaml files parsed as string
assert.equal(_.isObject(defaultConfig), true, 'CONFIG: it doesn\'t look like a valid config file');
// some weird shell scripts are valid yaml files parsed as string
assert.equal(typeof(config), 'object', 'CONFIG: it doesn\'t look like a valid config file');
this._mixConfigProperties(defaultConfig);
this._setUserAgent();
assert(self.storage, 'CONFIG: storage path not defined');
const users = {
'all': true,
'anonymous': true,
'undefined': true,
'owner': true,
'none': true,
};
const check_user_or_uplink = function(arg) {
assert(arg !== 'all' && arg !== 'owner'
&& arg !== 'anonymous' && arg !== 'undefined' && arg !== 'none', 'CONFIG: reserved user/uplink name: ' + arg);
assert(!arg.match(/\s/), 'CONFIG: invalid user name: ' + arg);
assert(users[arg] == null, 'CONFIG: duplicate user/uplink name: ' + arg);
users[arg] = true;
}
// sanity check for strategic config properties
;['users', 'uplinks', 'packages'].forEach(function(x) {
if (self[x] == null) self[x] = {};
integrityPackages.forEach(function(x) {
if (_.isNil(self[x])) {
self[x] = {};
}
assert(Utils.is_object(self[x]), `CONFIG: bad "${x}" value (object expected)`);
});
// sanity check for users
for (let i in self.users) {
if (Object.prototype.hasOwnProperty.call(self.users, i)) {
check_user_or_uplink(i);
}
}
// sanity check for uplinks
for (let i in self.uplinks) {
if (self.uplinks[i].cache == null) {
@ -127,6 +153,7 @@ class Config {
check_user_or_uplink(i);
}
}
for (let i in self.users) {
if (Object.prototype.hasOwnProperty.call(self.users, i)) {
assert(self.users[i].password, 'CONFIG: no password for user: ' + i);
@ -135,6 +162,7 @@ class Config {
, 'CONFIG: wrong password format for user: ' + i + ', sha1 expected');
}
}
for (let i in self.uplinks) {
if (Object.prototype.hasOwnProperty.call(self.uplinks, i)) {
assert(self.uplinks[i].url, 'CONFIG: no url for uplink: ' + i);
@ -144,73 +172,103 @@ class Config {
}
}
/**
* Normalise user list.
* @return {Array}
*/
function normalize_userlist() {
let result = [];
this._setDefaultPackage();
for (let i=0; i<arguments.length; i++) {
if (arguments[i] == null) continue;
this._safetyCheckPackages();
// if it's a string, split it to array
if (typeof(arguments[i]) === 'string') {
result.push(arguments[i].split(/\s+/));
} else if (Array.isArray(arguments[i])) {
result.push(arguments[i]);
} else {
throw Error('CONFIG: bad package acl (array or string expected): ' + JSON.stringify(arguments[i]));
}
}
return flatten(result);
}
this._loadEnvironmentProxies();
// add a default rule for all packages to make writing plugins easier
if (self.packages['**'] == null) {
self.packages['**'] = {};
}
for (let i in self.packages) {
if (Object.prototype.hasOwnProperty.call(self.packages, i)) {
assert(
typeof(self.packages[i]) === 'object' &&
!Array.isArray(self.packages[i])
, 'CONFIG: bad "'+i+'" package description (object expected)');
self.packages[i].access = normalize_userlist(
self.packages[i].allow_access,
self.packages[i].access
);
delete self.packages[i].allow_access;
self.packages[i].publish = normalize_userlist(
self.packages[i].allow_publish,
self.packages[i].publish
);
delete self.packages[i].allow_publish;
self.packages[i].proxy = normalize_userlist(
self.packages[i].proxy_access,
self.packages[i].proxy
);
delete self.packages[i].proxy_access;
}
}
// loading these from ENV if aren't in config
['http_proxy', 'https_proxy', 'no_proxy'].forEach((function(v) {
if (!(v in self)) {
self[v] = process.env[v] || process.env[v.toUpperCase()];
}
}));
// unique identifier of self server (or a cluster), used to avoid loops
if (!self.server_id) {
self.server_id = Crypto.pseudoRandomBytes(6).toString('hex');
}
this._generateServerId();
}
/**
* Mix the external configuration file.
* @param {object} defaultConfig
* @private
*/
_mixConfigProperties(defaultConfig) {
for (let i in defaultConfig) {
if (_.isNil(this[i])) {
this[i] = defaultConfig[i];
}
}
}
/**
* Set the user agent.
* @private
*/
_setUserAgent() {
if (!this.user_agent) {
this.user_agent = `${pkgName}/${pkgVersion}`;
}
}
/**
*
* @private
*/
_setDefaultPackage() {
// add a default rule for all packages to make writing plugins easier
if (_.isNil(this.packages['**'])) {
this.packages['**'] = {};
}
}
/**
*
* @private
*/
_loadEnvironmentProxies() {
// loading these from ENV if aren't in config
integrityProxy.forEach(((v) => {
if (!(v in this)) {
this[v] = process.env[v] || process.env[v.toUpperCase()];
}
}));
}
/**
* unique identifier of self server (or a cluster), used to avoid loops
* @private
*/
_generateServerId() {
if (!this.server_id) {
this.server_id = Crypto.pseudoRandomBytes(6).toString('hex');
}
}
/**
*
* @private
*/
_safetyCheckPackages() {
for (let i in this.packages) {
if (Object.prototype.hasOwnProperty.call(this.packages, i)) {
// validate integrity packages
assert(_.isObject(this.packages[i]) && _.isArray(this.packages[i]) === false, 'CONFIG: bad "'+i+'" package description (object expected)');
this.packages[i].access = normalize_userlist(
this.packages[i].allow_access,
this.packages[i].access
);
delete this.packages[i].allow_access;
this.packages[i].publish = normalize_userlist(
this.packages[i].allow_publish,
this.packages[i].publish
);
delete this.packages[i].allow_publish;
this.packages[i].proxy = normalize_userlist(
this.packages[i].proxy_access,
this.packages[i].proxy
);
delete this.packages[i].proxy_access;
}
}
}
/**
* Check whether an uplink can proxy
* @param {*} pkg
@ -218,21 +276,21 @@ class Config {
* @return {Boolean}
*/
can_proxy_to(pkg, uplink) {
return (this.get_package_spec(pkg).proxy || []).reduce(function(prev, curr) {
if (uplink === curr) return true;
return prev;
const compatibleProxies = this.get_package_spec(pkg).proxy || [];
return (compatibleProxies).reduce(function(prev, curr) {
return uplink === curr ? true : prev;
}, false);
}
/**
* Check for package spec
* @param {*} pkg
* Check for package spec.
* @param {String} pkgName
* @return {Object}
*/
get_package_spec(pkg) {
for (let i in this.packages) {
if (minimatch.makeRe(i).exec(pkg)) {
return this.packages[i];
get_package_spec(pkgName) {
for (let pkg in this.packages) {
if (minimatch.makeRe(pkg).exec(pkgName)) {
return this.packages[pkg];
}
}
return {};

View file

@ -3,18 +3,32 @@
'use strict';
const fs = require('fs');
const Error = require('http-errors');
const path = require('path');
const createError = require('http-errors');
const mkdirp = require('mkdirp');
const Path = require('path');
const MyStreams = require('./streams');
const MyStream = require('./streams');
const locker = require('./file-locking');
const fileExist = 'EEXISTS';
const noSuchFile = 'ENOENT';
const fSError = function(code) {
const err = Error(code);
const err = createError(code);
err.code = code;
return err;
};
const readFile = function(name) {
return new Promise((resolve, reject) => {
fs.readFile(name, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
};
const tempFile = function(str) {
return `${str}.tmp${String(Math.random()).substr(2)}`;
};
@ -33,7 +47,7 @@ const renameTmp = function(src, dst, _cb) {
// windows can't remove opened file,
// but it seem to be able to rename it
let tmp = tempFile(dst);
const tmp = tempFile(dst);
fs.rename(dst, tmp, function(err) {
fs.rename(src, dst, cb);
if (!err) {
@ -42,20 +56,24 @@ const renameTmp = function(src, dst, _cb) {
});
};
const write = function(dest, data, cb) {
let safe_write = function(cb) {
let tmpname = tempFile(dest);
fs.writeFile(tmpname, data, function(err) {
if (err) return cb(err);
renameTmp(tmpname, dest, cb);
const writeFile = function(dest, data, cb) {
const createTempFile = function(cb) {
const tempFilePath = tempFile(dest);
fs.writeFile(tempFilePath, data, function(err) {
if (err) {
return cb(err);
}
renameTmp(tempFilePath, dest, cb);
});
};
safe_write(function(err) {
if (err && err.code === 'ENOENT') {
mkdirp(Path.dirname(dest), function(err) {
if (err) return cb(err);
safe_write(cb);
createTempFile(function(err) {
if (err && err.code === noSuchFile) {
mkdirp(path.dirname(dest), function(err) {
if (err) {
return cb(err);
}
createTempFile(cb);
});
} else {
cb(err);
@ -63,31 +81,31 @@ const write = function(dest, data, cb) {
});
};
const write_stream = function(name) {
const stream = new MyStreams.UploadTarball();
const createWriteStream = function(name) {
const uploadStream = new MyStream.UploadTarball();
let _ended = 0;
stream.on('end', function() {
uploadStream.on('end', function() {
_ended = 1;
});
fs.exists(name, function(exists) {
if (exists) {
return stream.emit('error', fSError('EEXISTS'));
return uploadStream.emit('error', fSError(fileExist));
}
let tmpname = name + '.tmp-'+String(Math.random()).replace(/^0\./, '');
let file = fs.createWriteStream(tmpname);
const temporalName = `${name}.tmp-${String(Math.random()).replace(/^0\./, '')}`;
const file = fs.createWriteStream(temporalName);
let opened = false;
stream.pipe(file);
uploadStream.pipe(file);
stream.done = function() {
uploadStream.done = function() {
const onend = function() {
file.on('close', function() {
renameTmp(tmpname, name, function(err) {
renameTmp(temporalName, name, function(err) {
if (err) {
stream.emit('error', err);
uploadStream.emit('error', err);
} else {
stream.emit('success');
uploadStream.emit('success');
}
});
});
@ -96,14 +114,14 @@ const write_stream = function(name) {
if (_ended) {
onend();
} else {
stream.on('end', onend);
uploadStream.on('end', onend);
}
};
stream.abort = function() {
uploadStream.abort = function() {
if (opened) {
opened = false;
file.on('close', function() {
fs.unlink(tmpname, function() {});
fs.unlink(temporalName, function() {});
});
}
file.destroySoon();
@ -111,66 +129,56 @@ const write_stream = function(name) {
file.on('open', function() {
opened = true;
// re-emitting open because it's handled in storage.js
stream.emit('open');
uploadStream.emit('open');
});
file.on('error', function(err) {
stream.emit('error', err);
uploadStream.emit('error', err);
});
});
return stream;
return uploadStream;
};
const read_stream = function(name, stream, callback) {
let rstream = fs.createReadStream(name);
rstream.on('error', function(err) {
stream.emit('error', err);
const createReadStream = function(name, readTarballStream, callback) {
let readStream = fs.createReadStream(name);
readStream.on('error', function(err) {
readTarballStream.emit('error', err);
});
rstream.on('open', function(fd) {
readStream.on('open', function(fd) {
fs.fstat(fd, function(err, stats) {
if (err) return stream.emit('error', err);
stream.emit('content-length', stats.size);
stream.emit('open');
rstream.pipe(stream);
if (err) return readTarballStream.emit('error', err);
readTarballStream.emit('content-length', stats.size);
readTarballStream.emit('open');
readStream.pipe(readTarballStream);
});
});
stream = new MyStreams.ReadTarball();
stream.abort = function() {
rstream.close();
readTarballStream = new MyStream.ReadTarball();
readTarballStream.abort = function() {
readStream.close();
};
return stream;
return readTarballStream;
};
const create = function(name, contents, callback) {
const createFile = function(name, contents, callback) {
fs.exists(name, function(exists) {
if (exists) {
return callback( fSError('EEXISTS') );
return callback( fSError(fileExist) );
}
write(name, contents, callback);
writeFile(name, contents, callback);
});
};
const update = function(name, contents, callback) {
const updateFile = function(name, contents, callback) {
fs.exists(name, function(exists) {
if (!exists) {
return callback( fSError('ENOENT') );
return callback( fSError(noSuchFile) );
}
write(name, contents, callback);
writeFile(name, contents, callback);
});
};
const read = function(name, callback) {
fs.readFile(name, callback);
};
module.exports.read = read;
module.exports.read_json = function(name, cb) {
read(name, function(err, res) {
if (err) {
return cb(err);
}
const readJSON = function(name, cb) {
readFile(name).then(function(res) {
let args = [];
try {
args = [null, JSON.parse(res.toString('utf8'))];
@ -178,10 +186,12 @@ module.exports.read_json = function(name, cb) {
args = [err];
}
cb.apply(null, args);
}, function(err) {
return cb(err);
});
};
module.exports.lock_and_read = function(name, cb) {
const lock_and_read = function(name, cb) {
locker.readFile(name, {lock: true}, function(err, res) {
if (err) {
return cb(err);
@ -190,7 +200,7 @@ module.exports.lock_and_read = function(name, cb) {
});
};
module.exports.lock_and_read_json = function(name, cb) {
const lockAndReadJSON = function(name, cb) {
locker.readFile(name, {lock: true, parse: true}, function(err, res) {
if (err) {
return cb(err);
@ -199,33 +209,45 @@ module.exports.lock_and_read_json = function(name, cb) {
});
};
module.exports.unlock_file = function(name, cb) {
const unlock_file = function(name, cb) {
locker.unlockFile(name, cb);
};
module.exports.create = create;
module.exports.create_json = function(name, value, cb) {
create(name, JSON.stringify(value, null, '\t'), cb);
const createJSON = function(name, value, cb) {
createFile(name, JSON.stringify(value, null, '\t'), cb);
};
module.exports.update = update;
module.exports.update_json = function(name, value, cb) {
update(name, JSON.stringify(value, null, '\t'), cb);
const updateJSON = function(name, value, cb) {
updateFile(name, JSON.stringify(value, null, '\t'), cb);
};
module.exports.write = write;
module.exports.write_json = function(name, value, cb) {
write(name, JSON.stringify(value, null, '\t'), cb);
const writeJSON = function(name, value, cb) {
writeFile(name, JSON.stringify(value, null, '\t'), cb);
};
module.exports.write_stream = write_stream;
module.exports.read_stream = read_stream;
// fs
module.exports.unlink = fs.unlink;
module.exports.rmdir = fs.rmdir;
// streams
module.exports.createWriteStream = createWriteStream;
module.exports.createReadStream = createReadStream;
// io
module.exports.read = readFile;
module.exports.write = writeFile;
module.exports.update = updateFile;
module.exports.create = createFile;
// json
module.exports.readJSON = readJSON;
module.exports.lockAndReadJSON = lockAndReadJSON;
module.exports.writeJSON = writeJSON;
module.exports.updateJSON = updateJSON;
module.exports.createJSON = createJSON;
// lock
module.exports.unlock_file = unlock_file;
module.exports.lock_and_read = lock_and_read;

View file

@ -1,22 +1,26 @@
/* eslint prefer-rest-params: "off" */
/* eslint prefer-spread: "off" */
'use strict';
const assert = require('assert');
const async = require('async');
const Crypto = require('crypto');
const fs = require('fs');
const Error = require('http-errors');
const Path = require('path');
const Stream = require('stream');
const URL = require('url');
const url = require('url');
const fs_storage = require('./local-fs');
const async = require('async');
const createError = require('http-errors');
const _ = require('lodash');
const fsStorage = require('./local-fs');
const LocalData = require('./local-data');
const Logger = require('./logger');
const MyStreams = require('./streams');
const customStream = require('./streams');
const Utils = require('./utils');
const info_file = 'package.json';
const pkgFileName = 'package.json';
const fileExist = 'EEXISTS';
const noSuchFile = 'ENOENT';
const resourceNotAvailable = 'EAGAIN';
// returns the minimal package file
const get_boilerplate = function(name) {
@ -63,12 +67,12 @@ class Storage {
addPackage(name, info, callback) {
const storage = this.storage(name);
if (!storage) {
return callback( Error[404]('this package cannot be added'));
return callback( createError(404, 'this package cannot be added'));
}
storage.create_json(info_file, get_boilerplate(name), function(err) {
if (err && err.code === 'EEXISTS') {
return callback( Error[409]('this package is already present') );
storage.createJSON(pkgFileName, get_boilerplate(name), function(err) {
if (err && err.code === fileExist) {
return callback( createError(409, 'this package is already present'));
}
const latest = info['dist-tags'].latest;
if (latest && info.versions[latest]) {
@ -85,25 +89,24 @@ class Storage {
* @return {Function}
*/
removePackage(name, callback) {
this.logger.info( {name: name}
, 'unpublishing @{name} (all)');
this.logger.info( {name: name}, 'unpublishing @{name} (all)');
let storage = this.storage(name);
if (!storage) {
return callback( Error[404]('no such package available') );
return callback( createError(404, 'no such package available'));
}
storage.read_json(info_file, (err, data) => {
storage.readJSON(pkgFileName, (err, data) => {
if (err) {
if (err.code === 'ENOENT') {
return callback( Error[404]('no such package available') );
if (err.code === noSuchFile) {
return callback( createError(404, 'no such package available'));
} else {
return callback(err);
}
}
this._normalizePackage(data);
storage.unlink(info_file, function(err) {
storage.unlink(pkgFileName, function(err) {
if (err) {
return callback(err);
}
@ -146,7 +149,7 @@ class Storage {
let change = false;
for (let ver in newdata.versions) {
if (data.versions[ver] == null) {
if (_.isNil(data.versions[ver])) {
let verdata = newdata.versions[ver];
// we don't keep readmes for package versions,
@ -157,9 +160,9 @@ class Storage {
data.versions[ver] = verdata;
if (verdata.dist && verdata.dist.tarball) {
let filename = URL.parse(verdata.dist.tarball).pathname.replace(/^.*\//, '');
let filename = url.parse(verdata.dist.tarball).pathname.replace(/^.*\//, '');
// we do NOT overwrite any existing records
if (data._distfiles[filename] == null) {
if (_.isNil(data._distfiles[filename])) {
let hash = data._distfiles[filename] = {
url: verdata.dist.tarball,
sha: verdata.dist.shasum,
@ -170,12 +173,12 @@ class Storage {
// use the same protocol for the tarball
//
// see https://github.com/rlidwka/sinopia/issues/166
let tarball_url = URL.parse(hash.url);
let uplink_url = URL.parse(this.config.uplinks[verdata._verdaccio_uplink].url);
const tarball_url = url.parse(hash.url);
const uplink_url = url.parse(this.config.uplinks[verdata._verdaccio_uplink].url);
if (uplink_url.host === tarball_url.host) {
tarball_url.protocol = uplink_url.protocol;
hash.registry = verdata._verdaccio_uplink;
hash.url = URL.format(tarball_url);
hash.url = url.format(tarball_url);
}
}
}
@ -231,7 +234,7 @@ class Storage {
delete metadata.readme;
if (data.versions[version] != null) {
return cb( Error[409]('this version already present') );
return cb( createError[409]('this version already present') );
}
// if uploaded tarball has a different shasum, it's very likely that we have some kind of error
@ -240,7 +243,7 @@ class Storage {
if (Utils.is_object(data._attachments[tarball])) {
if (data._attachments[tarball].shasum != null && metadata.dist.shasum != null) {
if (data._attachments[tarball].shasum != metadata.dist.shasum) {
return cb( Error[400]('shasum error, '
return cb( createError[400]('shasum error, '
+ data._attachments[tarball].shasum
+ ' != ' + metadata.dist.shasum) );
}
@ -270,8 +273,8 @@ class Storage {
continue;
}
// be careful here with == (cast)
if (data.versions[tags[t]] == null) {
return cb( Error[404]('this version doesn\'t exist') );
if (_.isNil(data.versions[tags[t]])) {
return cb( createError[404]('this version doesn\'t exist') );
}
Utils.tag_version(data, tags[t], t);
@ -291,13 +294,13 @@ class Storage {
data['dist-tags'] = {};
for (let t in tags) {
if (tags[t] === null) {
if (_.isNull(tags[t])) {
delete data['dist-tags'][t];
continue;
}
if (data.versions[tags[t]] == null) {
return cb( Error[404]('this version doesn\'t exist') );
if (_.isNil(data.versions[tags[t]])) {
return cb( createError[404]('this version doesn\'t exist') );
}
Utils.tag_version(data, tags[t], t);
@ -317,12 +320,12 @@ class Storage {
*/
changePackage(name, metadata, revision, callback) {
if (!Utils.is_object(metadata.versions) || !Utils.is_object(metadata['dist-tags'])) {
return callback( Error[422]('bad data') );
return callback( createError[422]('bad data') );
}
this._updatePackage(name, (data, cb) => {
for (let ver in data.versions) {
if (metadata.versions[ver] == null) {
if (_.isNil(metadata.versions[ver])) {
this.logger.info( {name: name, version: ver},
'unpublishing @{name}@@{version}');
delete data.versions[ver];
@ -358,7 +361,7 @@ class Storage {
delete data._attachments[filename];
cb();
} else {
cb(Error[404]('no such file available'));
cb(createError[404]('no such file available'));
}
}, (err) => {
if (err) {
@ -375,90 +378,95 @@ class Storage {
* Add a tarball.
* @param {String} name
* @param {String} filename
* @return {Function}
* @return {Stream}
*/
addTarball(name, filename) {
assert(Utils.validate_name(filename));
let stream = new MyStreams.UploadTarball();
let _transform = stream._transform;
let length = 0;
let shasum = Crypto.createHash('sha1');
const shaOneHash = Crypto.createHash('sha1');
const uploadStream = new customStream.UploadTarball();
const _transform = uploadStream._transform;
const storage = this.storage(name);
uploadStream.abort = function() {};
uploadStream.done = function() {};
stream.abort = stream.done = function() {};
stream._transform = function(data) {
shasum.update(data);
uploadStream._transform = function(data) {
shaOneHash.update(data);
// measure the length for validation reasons
length += data.length;
_transform.apply(stream, arguments);
_transform.apply(uploadStream, arguments);
};
if (name === info_file || name === '__proto__') {
if (name === pkgFileName || name === '__proto__') {
process.nextTick(function() {
stream.emit('error', Error[403]('can\'t use this filename'));
uploadStream.emit('error', createError[403]('can\'t use this filename'));
});
return stream;
return uploadStream;
}
let storage = this.storage(name);
if (!storage) {
process.nextTick(function() {
stream.emit('error', Error[404]('can\'t upload this package'));
uploadStream.emit('error', createError[404]('can\'t upload this package'));
});
return stream;
return uploadStream;
}
let wstream = storage.write_stream(filename);
const writeStream = storage.createWriteStream(filename);
wstream.on('error', (err) => {
if (err.code === 'EEXISTS') {
stream.emit('error', Error[409]('this tarball is already present'));
} else if (err.code === 'ENOENT') {
writeStream.on('error', (err) => {
if (err.code === fileExist) {
uploadStream.emit('error', createError[409]('this tarball is already present'));
} else if (err.code === noSuchFile) {
// check if package exists to throw an appropriate message
this.getPackage(name, function(_err, res) {
if (_err) {
stream.emit('error', _err);
uploadStream.emit('error', _err);
} else {
stream.emit('error', err);
uploadStream.emit('error', err);
}
});
} else {
stream.emit('error', err);
uploadStream.emit('error', err);
}
});
wstream.on('open', function() {
writeStream.on('open', function() {
// re-emitting open because it's handled in storage.js
stream.emit('open');
uploadStream.emit('open');
});
wstream.on('success', () => {
writeStream.on('success', () => {
this._updatePackage(name, function updater(data, cb) {
data._attachments[filename] = {
shasum: shasum.digest('hex'),
shasum: shaOneHash.digest('hex'),
};
cb();
}, function(err) {
if (err) {
stream.emit('error', err);
uploadStream.emit('error', err);
} else {
stream.emit('success');
uploadStream.emit('success');
}
});
});
stream.abort = function() {
wstream.abort();
uploadStream.abort = function() {
writeStream.abort();
};
stream.done = function() {
uploadStream.done = function() {
if (!length) {
stream.emit('error', Error[422]('refusing to accept zero-length file'));
wstream.abort();
uploadStream.emit('error', createError[422]('refusing to accept zero-length file'));
writeStream.abort();
} else {
wstream.done();
writeStream.done();
}
};
stream.pipe(wstream);
return stream;
uploadStream.pipe(writeStream);
return uploadStream;
}
/**
@ -470,7 +478,7 @@ class Storage {
*/
getTarball(name, filename, callback) {
assert(Utils.validate_name(filename));
const stream = new MyStreams.ReadTarball();
const stream = new customStream.ReadTarball();
stream.abort = function() {
if (rstream) {
rstream.abort();
@ -480,15 +488,15 @@ class Storage {
let storage = this.storage(name);
if (!storage) {
process.nextTick(function() {
stream.emit('error', Error[404]('no such file available'));
stream.emit('error', createError[404]('no such file available'));
});
return stream;
}
/* eslint no-var: "off" */
var rstream = storage.read_stream(filename);
var rstream = storage.createReadStream(filename);
rstream.on('error', function(err) {
if (err && err.code === 'ENOENT') {
stream.emit('error', Error(404, 'no such file available'));
if (err && err.code === noSuchFile) {
stream.emit('error', createError(404, 'no such file available'));
} else {
stream.emit('error', err);
}
@ -512,21 +520,21 @@ class Storage {
* @return {Function}
*/
getPackage(name, options, callback) {
if (typeof(options) === 'function') {
callback = options, options = {};
if (_.isFunction(options)) {
callback = options || {};
}
let storage = this.storage(name);
if (!storage) {
return callback( Error[404]('no such package available') );
return callback( createError[404]('no such package available') );
}
storage.read_json(info_file, (err, result) => {
storage.readJSON(pkgFileName, (err, result) => {
if (err) {
if (err.code === 'ENOENT') {
return callback( Error[404]('no such package available') );
if (err.code === noSuchFile) {
return callback( createError[404]('no such package available') );
} else {
return callback(this._internalError(err, info_file, 'error reading'));
return callback(this._internalError(err, pkgFileName, 'error reading'));
}
}
this._normalizePackage(result);
@ -555,23 +563,23 @@ class Storage {
return cb(err);
}
let versions = Utils.semver_sort(Object.keys(data.versions));
let latest = data['dist-tags'] && data['dist-tags'].latest ? data['dist-tags'].latest : versions.pop();
const versions = Utils.semver_sort(Object.keys(data.versions));
const latest = data['dist-tags'] && data['dist-tags'].latest ? data['dist-tags'].latest : versions.pop();
if (data.versions[latest]) {
const version = data.versions[latest];
stream.push({
'name': data.versions[latest].name,
'description': data.versions[latest].description,
'name': version.name,
'description': version.description,
'dist-tags': {latest: latest},
'maintainers': data.versions[latest].maintainers
|| [data.versions[latest].author].filter(Boolean),
'author': data.versions[latest].author,
'repository': data.versions[latest].repository,
'readmeFilename': data.versions[latest].readmeFilename || '',
'homepage': data.versions[latest].homepage,
'keywords': data.versions[latest].keywords,
'bugs': data.versions[latest].bugs,
'license': data.versions[latest].license,
'maintainers': version.maintainers || [version.author].filter(Boolean),
'author': version.author,
'repository': version.repository,
'readmeFilename': version.readmeFilename || '',
'homepage': version.homepage,
'keywords': version.keywords,
'bugs': version.bugs,
'license': version.license,
'time': {
modified: item.time ? new Date(item.time).toISOString() : undefined,
},
@ -600,12 +608,11 @@ class Storage {
*/
storage(pkg) {
let path = this.config.get_package_spec(pkg).storage;
if (path == null) {
if (_.isNil(path)) {
path = this.config.storage;
}
if (path == null || path === false) {
this.logger.debug( {name: pkg}
, 'this package has no storage defined: @{name}' );
if (_.isNil(path) || path === false) {
this.logger.debug( {name: pkg}, 'this package has no storage defined: @{name}' );
return null;
}
return new PathWrapper(
@ -618,10 +625,10 @@ class Storage {
/**
* Walks through each package and calls `on_package` on them.
* @param {*} on_package
* @param {*} onPackage
* @param {*} on_end
*/
_eachPackage(on_package, on_end) {
_eachPackage(onPackage, on_end) {
let storages = {};
storages[this.config.storage] = true;
@ -650,7 +657,7 @@ class Storage {
async.eachSeries(files, function(file2, cb) {
if (Utils.validate_name(file2)) {
on_package({
onPackage({
name: `${file}/${file2}`,
path: Path.resolve(base, storage, file, file2),
}, cb);
@ -660,7 +667,7 @@ class Storage {
}, cb);
});
} else if (Utils.validate_name(file)) {
on_package({
onPackage({
name: file,
path: Path.resolve(base, storage, file),
}, cb);
@ -670,6 +677,10 @@ class Storage {
}, cb);
});
}, on_end);
// Object.keys(storages).reduce(() => {
//
// }, Promise.resolve());
}
/**
@ -698,18 +709,18 @@ class Storage {
_readCreatePackage(name, callback) {
const storage = this.storage(name);
if (!storage) {
let data = get_boilerplate(name);
const data = get_boilerplate(name);
this._normalizePackage(data);
return callback(null, data);
}
storage.read_json(info_file, (err, data) => {
storage.readJSON(pkgFileName, (err, data) => {
// TODO: race condition
if (err) {
if (err.code === 'ENOENT') {
if (err.code === noSuchFile) {
// if package doesn't exist, we create it here
data = get_boilerplate(name);
} else {
return callback(this._internalError(err, info_file, 'error reading'));
return callback(this._internalError(err, pkgFileName, 'error reading'));
}
}
this._normalizePackage(data);
@ -727,7 +738,7 @@ class Storage {
_internalError(err, file, message) {
this.logger.error( {err: err, file: file},
message + ' @{file}: @{!err.message}' );
return Error[500]();
return createError[500]();
}
/**
@ -747,16 +758,16 @@ class Storage {
_updatePackage(name, updateFn, _callback) {
const storage = this.storage(name);
if (!storage) {
return _callback( Error[404]('no such package available') );
return _callback( createError[404]('no such package available') );
}
storage.lock_and_read_json(info_file, (err, json) => {
storage.lockAndReadJSON(pkgFileName, (err, json) => {
let locked = false;
// callback that cleans up lock first
const callback = function(err) {
let _args = arguments;
if (locked) {
storage.unlock_file(info_file, function() {
storage.unlock_file(pkgFileName, function() {
// ignore any error from the unlock
_callback.apply(err, _args);
});
@ -770,10 +781,10 @@ class Storage {
}
if (err) {
if (err.code === 'EAGAIN') {
return callback( Error[503]('resource temporarily unavailable') );
} else if (err.code === 'ENOENT') {
return callback( Error[404]('no such package available') );
if (err.code === resourceNotAvailable) {
return callback( createError[503]('resource temporarily unavailable') );
} else if (err.code === noSuchFile) {
return callback( createError[404]('no such package available') );
} else {
return callback(err);
}
@ -801,14 +812,14 @@ class Storage {
if (typeof(json._rev) !== 'string') {
json._rev = '0-0000000000000000';
}
let rev = json._rev.split('-');
const rev = json._rev.split('-');
json._rev = ((+rev[0] || 0) + 1) + '-' + Crypto.pseudoRandomBytes(8).toString('hex');
let storage = this.storage(name);
if (!storage) {
return callback();
}
storage.write_json(info_file, json, callback);
storage.writeJSON(pkgFileName, json, callback);
}
}
@ -831,12 +842,12 @@ const PathWrapper = (function() {
let args = Array.prototype.slice.apply(arguments);
/* eslint no-invalid-this: off */
args[0] = Path.join(this.path, args[0] || '');
return fs_storage[method].apply(null, args);
return fsStorage[method].apply(null, args);
};
};
for (let i in fs_storage) {
if (fs_storage.hasOwnProperty(i)) {
for (let i in fsStorage) {
if (fsStorage.hasOwnProperty(i)) {
Wrapper.prototype[i] = wrapLocalStorageMethods(i);
}
}

View file

@ -2,7 +2,8 @@
const Handlebars = require('handlebars');
const request = require('request');
const Logger = require('./logger');
const _ = require('lodash');
const logger = require('./logger');
const handleNotify = function(metadata, notifyEntry) {
let regex;
@ -16,10 +17,12 @@ const handleNotify = function(metadata, notifyEntry) {
const template = Handlebars.compile(notifyEntry.content);
const content = template( metadata );
const options = {body: content};
const options = {
body: content,
};
// provides fallback support, it's accept an Object {} and Array of {}
if (notifyEntry.headers && Array.isArray(notifyEntry.headers)) {
if (notifyEntry.headers && _.isArray(notifyEntry.headers)) {
const header = {};
notifyEntry.headers.map(function(item) {
if (Object.is(item, item)) {
@ -43,17 +46,17 @@ const handleNotify = function(metadata, notifyEntry) {
request(options, function(err, response, body) {
if (err) {
Logger.logger.error({err: err}, ' notify error: @{err.message}' );
logger.logger.error({err: err}, ' notify error: @{err.message}' );
} else {
Logger.logger.info({content: content}, 'A notification has been shipped: @{content}');
logger.logger.info({content: content}, 'A notification has been shipped: @{content}');
if (body) {
Logger.logger.debug({body: body}, ' body: @{body}' );
logger.logger.debug({body: body}, ' body: @{body}' );
}
}
});
};
module.exports.notify = function(metadata, config) {
const notify = function(metadata, config) {
if (config.notify) {
if (config.notify.content) {
handleNotify(metadata, config.notify);
@ -66,3 +69,5 @@ module.exports.notify = function(metadata, config) {
}
}
};
module.exports.notify = notify;

View file

@ -299,7 +299,7 @@ class Storage {
function serve_file(file) {
let uplink = null;
for (let p in self.uplinks) {
if (self.uplinks[p].can_fetch_url(file.url)) {
if (self.uplinks[p].isUplinkValid(file.url)) {
uplink = self.uplinks[p];
}
}
@ -424,7 +424,7 @@ class Storage {
return cb();
}
// search by keyword for each uplink
let lstream = self.uplinks[up_name].search(startkey, options);
let lstream = self.uplinks[up_name].search(options);
// join streams
lstream.pipe(stream, {end: false});
lstream.on('error', function(err) {
@ -537,7 +537,7 @@ class Storage {
_options.etag = pkginfo._uplinks[up.upname].etag;
}
up.get_package(name, _options, function(err, up_res, etag) {
up.getRemotePackage(name, _options, function(err, up_res, etag) {
if (err && err.status === 304) {
pkginfo._uplinks[up.upname].fetched = Date.now();
}

View file

@ -5,7 +5,7 @@ const Stream = require('stream');
/**
* This stream is used to read tarballs from repository.
* @param {*} options
* @return {Object}
* @return {Stream}
*/
class ReadTarball extends Stream.PassThrough {
@ -23,7 +23,7 @@ class ReadTarball extends Stream.PassThrough {
/**
* This stream is used to upload tarballs to a repository.
* @param {*} options
* @return {Object}
* @return {Stream}
*/
class UploadTarball extends Stream.PassThrough {

View file

@ -1,11 +1,12 @@
'use strict';
const JSONStream = require('JSONStream');
const Error = require('http-errors');
const createError = require('http-errors');
const _ = require('lodash');
const request = require('request');
const Stream = require('readable-stream');
const Stream = require('stream');
const URL = require('url');
const parse_interval = require('./config').parse_interval;
const parseInterval = require('./config').parse_interval;
const Logger = require('./logger');
const MyStreams = require('./streams');
const Utils = require('./utils');
@ -13,29 +14,42 @@ const zlib = require('zlib');
const encode = function(thing) {
return encodeURIComponent(thing).replace(/^%40/, '@');
};
/**
* Just a helper (`config[key] || default` doesn't work because of zeroes)
* @param {Object} config
* @param {Object} key
* @param {Object} def
* @return {String}
*/
const setConfig = (config, key, def) => {
return _.isNil(config[key]) === false ? config[key] : def;
};
/**
* Implements Storage interface
* (same for storage.js, local-storage.js, up-storage.js)
*/
class Storage {
class ProxyStorage {
/**
* Constructor
* @param {*} config
* @param {*} mainconfig
* @param {*} mainConfig
*/
constructor(config, mainconfig) {
constructor(config, mainConfig) {
this.config = config;
this.failed_requests = 0;
this.userAgent = mainconfig.user_agent;
this.userAgent = mainConfig.user_agent;
this.ca = config.ca;
this.logger = Logger.logger.child({sub: 'out'});
this.server_id = mainconfig.server_id;
this.server_id = mainConfig.server_id;
this.url = URL.parse(this.config.url);
this._setupProxy(this.url.hostname, config, mainconfig, this.url.protocol === 'https:');
this._setupProxy(this.url.hostname, config, mainConfig, this.url.protocol === 'https:');
this.config.url = this.config.url.replace(/\/$/, '');
if (Number(this.config.timeout) >= 1000) {
this.logger.warn(['Too big timeout value: ' + this.config.timeout,
'We changed time format to nginx-like one',
@ -44,75 +58,10 @@ class Storage {
}
// a bunch of different configurable timers
this.maxage = parse_interval(config_get('maxage', '2m' ));
this.timeout = parse_interval(config_get('timeout', '30s'));
this.max_fails = Number(config_get('max_fails', 2 ));
this.fail_timeout = parse_interval(config_get('fail_timeout', '5m' ));
return this;
/**
* Just a helper (`config[key] || default` doesn't work because of zeroes)
* @param {*} key
* @param {*} def
* @return {String}
*/
function config_get(key, def) {
return config[key] != null ? config[key] : def;
}
}
/**
* Set up a proxy.
* @param {*} hostname
* @param {*} config
* @param {*} mainconfig
* @param {*} isHTTPS
*/
_setupProxy(hostname, config, mainconfig, isHTTPS) {
let no_proxy;
let proxy_key = isHTTPS ? 'https_proxy' : 'http_proxy';
// get http_proxy and no_proxy configs
if (proxy_key in config) {
this.proxy = config[proxy_key];
} else if (proxy_key in mainconfig) {
this.proxy = mainconfig[proxy_key];
}
if ('no_proxy' in config) {
no_proxy = config.no_proxy;
} else if ('no_proxy' in mainconfig) {
no_proxy = mainconfig.no_proxy;
}
// use wget-like algorithm to determine if proxy shouldn't be used
if (hostname[0] !== '.') {
hostname = '.' + hostname;
}
if (typeof(no_proxy) === 'string' && no_proxy.length) {
no_proxy = no_proxy.split(',');
}
if (Array.isArray(no_proxy)) {
for (let i=0; i<no_proxy.length; i++) {
let no_proxy_item = no_proxy[i];
if (no_proxy_item[0] !== '.') no_proxy_item = '.' + no_proxy_item;
if (hostname.lastIndexOf(no_proxy_item) === hostname.length - no_proxy_item.length) {
if (this.proxy) {
this.logger.debug({url: this.url.href, rule: no_proxy_item},
'not using proxy for @{url}, excluded by @{rule} rule');
this.proxy = false;
}
break;
}
}
}
// if it's non-string (i.e. "false"), don't use it
if (typeof(this.proxy) !== 'string') {
delete this.proxy;
} else {
this.logger.debug( {url: this.url.href, proxy: this.proxy}
, 'using proxy @{proxy} for @{url}' );
}
this.maxage = parseInterval(setConfig(this.config, 'maxage', '2m' ));
this.timeout = parseInterval(setConfig(this.config, 'timeout', '30s'));
this.max_fails = Number(setConfig(this.config, 'max_fails', 2 ));
this.fail_timeout = parseInterval(setConfig(this.config, 'fail_timeout', '5m' ));
}
/**
@ -123,16 +72,18 @@ class Storage {
*/
request(options, cb) {
let json;
if (!this.status_check()) {
let req = new Stream.Readable();
if (this._statusCheck() === false) {
let streamRead = new Stream.Readable();
process.nextTick(function() {
if (typeof(cb) === 'function') cb(Error('uplink is offline'));
req.emit('error', Error('uplink is offline'));
if (_.isFunction(cb)) {
cb(createError('uplink is offline'));
}
streamRead.emit('error', createError('uplink is offline'));
});
req._read = function() {};
streamRead._read = function() {};
// preventing 'Uncaught, unspecified "error" event'
req.on('error', function() {});
return req;
streamRead.on('error', function() {});
return streamRead;
}
let self = this;
@ -141,7 +92,7 @@ class Storage {
headers['Accept-Encoding'] = headers['Accept-Encoding'] || 'gzip';
// registry.npmjs.org will only return search result if user-agent include string 'npm'
headers['User-Agent'] = headers['User-Agent'] || `npm (${this.userAgent})`;
this._add_proxy_headers(options.req, headers);
this._addProxyHeaders(options.req, headers);
// add/override headers specified in the config
for (let key in this.config.headers) {
@ -150,8 +101,8 @@ class Storage {
}
}
let method = options.method || 'GET';
let uri = options.uri_full || (this.config.url + options.uri);
const method = options.method || 'GET';
const uri = options.uri_full || (this.config.url + options.uri);
self.logger.info({
method: method,
@ -164,18 +115,18 @@ class Storage {
headers['Content-Type'] = headers['Content-Type'] || 'application/json';
}
let request_callback = cb ? (function(err, res, body) {
let requestCallback = cb ? (function(err, res, body) {
let error;
let res_length = err ? 0 : body.length;
const responseLength = err ? 0 : body.length;
do_decode();
do_log();
processBody(err, body);
logActivity();
cb(err, res, body);
/**
* Perform a decode.
*/
function do_decode() {
function processBody() {
if (err) {
error = err.message;
return;
@ -192,7 +143,7 @@ class Storage {
}
if (!err && Utils.is_object(body)) {
if (typeof(body.error) === 'string') {
if (_.isString(body.error)) {
error = body.error;
}
}
@ -200,7 +151,7 @@ class Storage {
/**
* Perform a log.
*/
function do_log() {
function logActivity() {
let message = '@{!status}, req: \'@{request.method} @{request.url}\'';
message += error
? ', error: @{!error}'
@ -213,7 +164,7 @@ class Storage {
error: error,
bytes: {
in: json ? json.length : 0,
out: res_length || 0,
out: responseLength || 0,
},
}, message);
}
@ -229,74 +180,46 @@ class Storage {
encoding: null,
gzip: true,
timeout: this.timeout,
}, request_callback);
}, requestCallback);
let status_called = false;
let statusCalled = false;
req.on('response', function(res) {
if (!req._verdaccio_aborted && !status_called) {
status_called = true;
self.status_check(true);
if (!req._verdaccio_aborted && _.isNil(statusCalled) === false) {
statusCalled = true;
self._statusCheck(true);
}
if (!request_callback) {
if (_.isNil(requestCallback) === false) {
(function do_log() {
let message = '@{!status}, req: \'@{request.method} @{request.url}\' (streaming)';
const message = '@{!status}, req: \'@{request.method} @{request.url}\' (streaming)';
self.logger.warn({
request: {method: method, url: uri},
request: {
method: method,
url: uri,
},
level: 35, // http
status: res != null ? res.statusCode : 'ERR',
status: _.isNull(res) === false ? res.statusCode : 'ERR',
}, message);
})();
}
});
req.on('error', function(_err) {
if (!req._verdaccio_aborted && !status_called) {
status_called = true;
self.status_check(false);
if (!req._verdaccio_aborted && !statusCalled) {
statusCalled = true;
self._statusCheck(false);
}
});
return req;
}
/**
* Check whether the remote host is available.
* @param {*} alive
* @return {Boolean}
*/
status_check(alive) {
if (arguments.length === 0) {
if (this.failed_requests >= this.max_fails
&& Math.abs(Date.now() - this.last_request_time) < this.fail_timeout) {
return false;
} else {
return true;
}
} else {
if (alive) {
if (this.failed_requests >= this.max_fails) {
this.logger.warn({host: this.url.host}, 'host @{host} is back online');
}
this.failed_requests = 0;
} else {
this.failed_requests++;
if (this.failed_requests === this.max_fails) {
this.logger.warn({host: this.url.host}, 'host @{host} is now offline');
}
}
this.last_request_time = Date.now();
}
}
/**
* Determine whether can fetch from the provided URL.
* @param {*} url
* @return {Boolean}
*/
can_fetch_url(url) {
isUplinkValid(url) {
url = URL.parse(url);
return url.protocol === this.url.protocol
&& url.host === this.url.host
&& url.path.indexOf(this.url.path) === 0;
return url.protocol === this.url.protocol && url.host === this.url.host && url.path.indexOf(this.url.path) === 0;
}
/**
@ -305,13 +228,9 @@ class Storage {
* @param {*} options
* @param {*} callback
*/
get_package(name, options, callback) {
if (typeof(options) === 'function') {
callback = options;
options = {};
}
getRemotePackage(name, options, callback) {
const headers = {};
if (options.etag) {
if (_.isNil(options.etag) === false) {
headers['If-None-Match'] = options.etag;
headers['Accept'] = 'application/octet-stream';
}
@ -321,13 +240,15 @@ class Storage {
json: true,
headers: headers,
req: options.req,
}, function(err, res, body) {
if (err) return callback(err);
}, (err, res, body) => {
if (err) {
return callback(err);
}
if (res.statusCode === 404) {
return callback( Error[404]('package doesn\'t exist on uplink') );
return callback( createError[404]('package doesn\'t exist on uplink') );
}
if (!(res.statusCode >= 200 && res.statusCode < 300)) {
let error = Error('bad status code: ' + res.statusCode);
const error = createError(`bad status code: ${res.statusCode}`);
error.remoteStatus = res.statusCode;
return callback(error);
}
@ -335,75 +256,61 @@ class Storage {
});
}
/**
* Retrieve a tarball.
* @param {*} name
* @param {*} options
* @param {*} filename
* @return {Stream}
*/
get_tarball(name, options, filename) {
// FUTURE: es6 note: this must be default parameter
if (!options) {
options = {};
}
return this.get_url(`${this.config.url}'/'${name}/-/${filename}`);
}
/**
* Get an url.
* @param {String} url
* @return {Stream}
*/
get_url(url) {
const stream = new MyStreams.ReadTarball();
const stream = new MyStreams.ReadTarball({});
stream.abort = function() {};
let current_length = 0;
let expected_length;
let rstream = this.request({
let readStream = this.request({
uri_full: url,
encoding: null,
headers: {Accept: 'application/octet-stream'},
});
rstream.on('response', function(res) {
readStream.on('response', function(res) {
if (res.statusCode === 404) {
return stream.emit('error', Error[404]('file doesn\'t exist on uplink'));
return stream.emit('error', createError[404]('file doesn\'t exist on uplink'));
}
if (!(res.statusCode >= 200 && res.statusCode < 300)) {
return stream.emit('error', Error('bad uplink status code: ' + res.statusCode));
return stream.emit('error', createError('bad uplink status code: ' + res.statusCode));
}
if (res.headers['content-length']) {
expected_length = res.headers['content-length'];
stream.emit('content-length', res.headers['content-length']);
}
rstream.pipe(stream);
readStream.pipe(stream);
});
rstream.on('error', function(err) {
readStream.on('error', function(err) {
stream.emit('error', err);
});
rstream.on('data', function(d) {
current_length += d.length;
readStream.on('data', function(data) {
current_length += data.length;
});
rstream.on('end', function(d) {
if (d) current_length += d.length;
readStream.on('end', function(data) {
if (data) {
current_length += data.length;
}
if (expected_length && current_length != expected_length)
stream.emit('error', Error('content length mismatch'));
stream.emit('error', createError('content length mismatch'));
});
return stream;
}
/**
* Perform a stream search.
* @param {*} startkey keyword
* @param {*} options request options
* @return {Stream}
*/
search(startkey, options) {
const stream = new Stream.PassThrough({objectMode: true});
let req = this.request({
search(options) {
const transformStream = new Stream.PassThrough({objectMode: true});
const requestStream = this.request({
uri: options.req.url,
req: options.req,
headers: {
@ -413,13 +320,13 @@ class Storage {
let parsePackage = (pkg) => {
if (Utils.is_object(pkg)) {
stream.emit('data', pkg);
transformStream.emit('data', pkg);
}
};
req.on('response', (res) => {
requestStream.on('response', (res) => {
if (!String(res.statusCode).match(/^2\d\d$/)) {
return stream.emit('error', Error('bad status code ' + res.statusCode + ' from uplink'));
return transformStream.emit('error', createError(`bad status code ${res.statusCode} from uplink`));
}
// See https://github.com/request/request#requestoptions-callback
@ -432,20 +339,20 @@ class Storage {
}
jsonStream.pipe(JSONStream.parse('*')).on('data', parsePackage);
jsonStream.on('end', () => {
stream.emit('end');
transformStream.emit('end');
});
});
req.on('error', (err) => {
stream.emit('error', err);
requestStream.on('error', (err) => {
transformStream.emit('error', err);
});
stream.abort = () => {
req.abort();
stream.emit('end');
transformStream.abort = () => {
requestStream.abort();
transformStream.emit('end');
};
return stream;
return transformStream;
}
/**
@ -453,7 +360,7 @@ class Storage {
* @param {*} req the http request
* @param {*} headers the request headers
*/
_add_proxy_headers(req, headers) {
_addProxyHeaders(req, headers) {
if (req) {
// Only submit X-Forwarded-For field if we don't have a proxy selected
// in the config file.
@ -461,7 +368,7 @@ class Storage {
// Otherwise misconfigured proxy could return 407:
// https://github.com/rlidwka/sinopia/issues/254
//
if (!this.proxy) {
if (this.proxy === false) {
headers['X-Forwarded-For'] = (
req && req.headers['x-forwarded-for']
? req.headers['x-forwarded-for'] + ', '
@ -479,6 +386,96 @@ class Storage {
headers['Via'] += '1.1 ' + this.server_id + ' (Verdaccio)';
}
/**
* Check whether the remote host is available.
* @param {*} alive
* @return {Boolean}
*/
_statusCheck(alive) {
if (arguments.length === 0) {
return this._ifRequestFailure() === false;
} else {
if (alive) {
if (this.failed_requests >= this.max_fails) {
this.logger.warn({
host: this.url.host,
}, 'host @{host} is back online');
}
this.failed_requests = 0;
} else {
this.failed_requests ++;
if (this.failed_requests === this.max_fails) {
this.logger.warn({
host: this.url.host,
}, 'host @{host} is now offline');
}
}
this.last_request_time = Date.now();
}
}
/**
* If the request failure.
* @return {boolean}
* @private
*/
_ifRequestFailure() {
return this.failed_requests >= this.max_fails && Math.abs(Date.now() - this.last_request_time) < this.fail_timeout;
}
/**
* Set up a proxy.
* @param {*} hostname
* @param {*} config
* @param {*} mainconfig
* @param {*} isHTTPS
*/
_setupProxy(hostname, config, mainconfig, isHTTPS) {
let noProxyList;
let proxy_key = isHTTPS ? 'https_proxy' : 'http_proxy';
// get http_proxy and no_proxy configs
if (proxy_key in config) {
this.proxy = config[proxy_key];
} else if (proxy_key in mainconfig) {
this.proxy = mainconfig[proxy_key];
}
if ('no_proxy' in config) {
noProxyList = config.no_proxy;
} else if ('no_proxy' in mainconfig) {
noProxyList = mainconfig.no_proxy;
}
// use wget-like algorithm to determine if proxy shouldn't be used
if (hostname[0] !== '.') {
hostname = '.' + hostname;
}
if (_.isString(noProxyList) && noProxyList.length) {
noProxyList = noProxyList.split(',');
}
if (_.isArray(noProxyList)) {
for (let i = 0; i < noProxyList.length; i++) {
let noProxyItem = noProxyList[i];
if (noProxyItem[0] !== '.') noProxyItem = '.' + noProxyItem;
if (hostname.lastIndexOf(noProxyItem) === hostname.length - noProxyItem.length) {
if (this.proxy) {
this.logger.debug({url: this.url.href, rule: noProxyItem},
'not using proxy for @{url}, excluded by @{rule} rule');
this.proxy = false;
}
break;
}
}
}
// if it's non-string (i.e. "false"), don't use it
if (_.isString(this.proxy) === false) {
delete this.proxy;
} else {
this.logger.debug( {url: this.url.href, proxy: this.proxy}, 'using proxy @{proxy} for @{url}' );
}
}
}
module.exports = Storage;
module.exports = ProxyStorage;

View file

@ -1,8 +1,9 @@
'use strict';
const assert = require('assert');
const Semver = require('semver');
const semver = require('semver');
const URL = require('url');
const _ = require('lodash');
const Logger = require('./logger');
/**
@ -29,7 +30,7 @@ function validate_package(name) {
* @return {Boolean} whether is valid or not
*/
function validate_name(name) {
if (typeof(name) !== 'string') {
if (_.isString(name) === false) {
return false;
}
name = name.toLowerCase();
@ -54,8 +55,8 @@ function validate_name(name) {
* @param {*} obj the element
* @return {Boolean}
*/
function is_object(obj) {
return typeof(obj) === 'object' && obj !== null && !Array.isArray(obj);
function isObject(obj) {
return _.isObject(obj) && _.isNull(obj) === false && _.isArray(obj) === false;
}
/**
@ -66,14 +67,14 @@ function is_object(obj) {
* @return {Object} the object with additional properties as dist-tags ad versions
*/
function validate_metadata(object, name) {
assert(module.exports.is_object(object), 'not a json object');
assert(isObject(object), 'not a json object');
assert.equal(object.name, name);
if (!module.exports.is_object(object['dist-tags'])) {
if (!isObject(object['dist-tags'])) {
object['dist-tags'] = {};
}
if (!module.exports.is_object(object['versions'])) {
if (!isObject(object['versions'])) {
object['versions'] = {};
}
@ -127,8 +128,7 @@ function filter_tarball_urls(pkg, req, config) {
for (let ver in pkg.versions) {
if (Object.prototype.hasOwnProperty.call(pkg.versions, ver)) {
const dist = pkg.versions[ver].dist;
if (dist != null && dist.tarball != null) {
// dist.__verdaccio_orig_tarball = dist.tarball
if (_.isNull(dist) === false && _.isNull(dist.tarball) === false) {
dist.tarball = filter(dist.tarball);
}
}
@ -146,7 +146,7 @@ function filter_tarball_urls(pkg, req, config) {
function tag_version(data, version, tag) {
if (tag) {
if (data['dist-tags'][tag] !== version) {
if (Semver.parse(version, true)) {
if (semver.parse(version, true)) {
// valid version - store
data['dist-tags'][tag] = version;
return true;
@ -172,9 +172,9 @@ function get_version(object, version) {
return object.versions[version];
}
try {
version = Semver.parse(version, true);
version = semver.parse(version, true);
for (let k in object.versions) {
if (version.compare(Semver.parse(k, true)) === 0) {
if (version.compare(semver.parse(k, true)) === 0) {
return object.versions[k];
}
}
@ -194,28 +194,30 @@ function get_version(object, version) {
- http://[::1]:443/ - ipv6
- unix:/tmp/http.sock - unix sockets
- https://unix:/tmp/http.sock - unix sockets (https)
* @param {*} addr the internet address definition
* @param {*} urlAddress the internet address definition
* @return {Object|Null} literal object that represent the address parsed
*/
function parse_address(addr) {
function parse_address(urlAddress) {
//
// TODO: refactor it to something more reasonable?
//
// protocol : // ( host )|( ipv6 ): port /
let m = /^((https?):(\/\/)?)?((([^\/:]*)|\[([^\[\]]+)\]):)?(\d+)\/?$/.exec(addr);
let urlPattern = /^((https?):(\/\/)?)?((([^\/:]*)|\[([^\[\]]+)\]):)?(\d+)\/?$/.exec(urlAddress);
if (m) return {
proto: m[2] || 'http',
host: m[6] || m[7] || 'localhost',
port: m[8] || '4873',
};
if (urlPattern) {
return {
proto: urlPattern[2] || 'http',
host: urlPattern[6] || urlPattern[7] || 'localhost',
port: urlPattern[8] || '4873',
};
}
m = /^((https?):(\/\/)?)?unix:(.*)$/.exec(addr);
urlPattern = /^((https?):(\/\/)?)?unix:(.*)$/.exec(urlAddress);
if (m) {
if (urlPattern) {
return {
proto: m[2] || 'http',
path: m[4],
proto: urlPattern[2] || 'http',
path: urlPattern[4],
};
}
@ -227,16 +229,16 @@ function parse_address(addr) {
* @param {*} array
* @return {Array} sorted Array
*/
function semver_sort(array) {
function semverSort(array) {
return array
.filter(function(x) {
if (!Semver.parse(x, true)) {
if (!semver.parse(x, true)) {
Logger.logger.warn( {ver: x}, 'ignoring bad version @{ver}' );
return false;
}
return true;
})
.sort(Semver.compareLoose)
.sort(semver.compareLoose)
.map(String);
}
@ -248,17 +250,17 @@ function normalize_dist_tags(data) {
let sorted;
if (!data['dist-tags'].latest) {
// overwrite latest with highest known version based on semver sort
sorted = module.exports.semver_sort(Object.keys(data.versions));
sorted = semverSort(Object.keys(data.versions));
if (sorted && sorted.length) {
data['dist-tags'].latest = sorted.pop();
}
}
for (let tag in data['dist-tags']) {
if (Array.isArray(data['dist-tags'][tag])) {
if (_.isArray(data['dist-tags'][tag])) {
if (data['dist-tags'][tag].length) {
// sort array
sorted = module.exports.semver_sort(data['dist-tags'][tag]);
sorted = semverSort(data['dist-tags'][tag]);
if (sorted.length) {
// use highest version based on semver sort
data['dist-tags'][tag] = sorted.pop();
@ -266,8 +268,8 @@ function normalize_dist_tags(data) {
} else {
delete data['dist-tags'][tag];
}
} else if (typeof data['dist-tags'][tag] === 'string') {
if (!Semver.parse(data['dist-tags'][tag], true)) {
} else if (_.isString(data['dist-tags'][tag] )) {
if (!semver.parse(data['dist-tags'][tag], true)) {
// if the version is invalid, delete the dist-tag entry
delete data['dist-tags'][tag];
}
@ -275,7 +277,7 @@ function normalize_dist_tags(data) {
}
}
module.exports.semver_sort = semver_sort;
module.exports.semver_sort = semverSort;
module.exports.parse_address = parse_address;
module.exports.get_version = get_version;
module.exports.normalize_dist_tags = normalize_dist_tags;
@ -283,6 +285,6 @@ module.exports.tag_version = tag_version;
module.exports.combineBaseUrl = combineBaseUrl;
module.exports.filter_tarball_urls = filter_tarball_urls;
module.exports.validate_metadata = validate_metadata;
module.exports.is_object = is_object;
module.exports.is_object = isObject;
module.exports.validate_name = validate_name;
module.exports.validate_package = validate_package;