0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2024-12-16 21:56:25 -05:00

search: stream results from npmjs instead of buffering them

This commit is contained in:
Alex Kocharin 2015-05-17 01:29:16 +03:00
parent f1bb9f83e6
commit 45edca2218
8 changed files with 227 additions and 131 deletions

View file

@ -100,6 +100,11 @@ function Config(config) {
return flatten(result) return flatten(result)
} }
// add a default rule for all packages to make writing plugins easier
if (self.packages['**'] == null) {
self.packages['**'] = {}
}
for (var i in self.packages) { for (var i in self.packages) {
assert( assert(
typeof(self.packages[i]) === 'object' && typeof(self.packages[i]) === 'object' &&

View file

@ -1,4 +1,3 @@
var async = require('async')
var Cookies = require('cookies') var Cookies = require('cookies')
var express = require('express') var express = require('express')
var expressJson5 = require('express-json5') var expressJson5 = require('express-json5')
@ -84,33 +83,57 @@ module.exports = function(config, auth, storage) {
// searching packages // searching packages
app.get('/-/all/:anything?', function(req, res, next) { app.get('/-/all/:anything?', function(req, res, next) {
storage.search(req.param.startkey || 0, {req: req}, function(err, result) { var received_end = false
if (err) return next(err) var response_finished = false
async.eachSeries(Object.keys(result), function(pkg, cb) { var processing_pkgs = 0
auth.allow_access(pkg, req.remote_user, function(err, allowed) {
if (err) {
if (err.status && String(err.status).match(/^4\d\d$/)) {
// auth plugin returns 4xx user error,
// that's equivalent of !allowed basically
allowed = false
} else {
return cb(err)
}
}
if (!allowed) delete result[pkg] res.status(200)
cb() res.write('{"_updated":' + Date.now());
})
}, function(err) { var stream = storage.search(req.param.startkey || 0, { req: req })
if (err) return next(err)
next(result) stream.on('data', function each(pkg) {
processing_pkgs++
auth.allow_access(pkg.name, req.remote_user, function(err, allowed) {
processing_pkgs--
if (err) {
if (err.status && String(err.status).match(/^4\d\d$/)) {
// auth plugin returns 4xx user error,
// that's equivalent of !allowed basically
allowed = false
} else {
stream.abort(err)
}
}
if (allowed) {
res.write(',\n' + JSON.stringify(pkg.name) + ':' + JSON.stringify(pkg))
}
check_finish()
}) })
}) })
})
//app.get('/*', function(req, res) { stream.on('error', function (_err) {
// proxy.request(req, res) res.socket.destroy()
//}) })
stream.on('end', function () {
received_end = true
check_finish()
})
function check_finish() {
if (!received_end) return
if (processing_pkgs) return
if (response_finished) return
response_finished = true
res.end('}\n')
}
})
// placeholder 'cause npm require to be authenticated to publish // placeholder 'cause npm require to be authenticated to publish
// we do not do any real authentication yet // we do not do any real authentication yet

View file

@ -58,18 +58,6 @@ module.exports = function(config_hash) {
next() next()
}) })
/* app.get('/-/all', function(req, res) {
var https = require('https')
var JSONStream = require('JSONStream')
var request = require('request')({
url: 'https://registry.npmjs.org/-/all',
})
.pipe(JSONStream.parse('*'))
.on('data', function(d) {
console.log(d)
})
})*/
// hook for tests only // hook for tests only
if (config._debug) { if (config._debug) {
app.get('/-/_debug', function(req, res, next) { app.get('/-/_debug', function(req, res, next) {

View file

@ -1,8 +1,10 @@
var assert = require('assert') var assert = require('assert')
var async = require('async')
var Crypto = require('crypto') var Crypto = require('crypto')
var fs = require('fs') var fs = require('fs')
var Error = require('http-errors') var Error = require('http-errors')
var Path = require('path') var Path = require('path')
var Stream = require('readable-stream')
var URL = require('url') var URL = require('url')
var fs_storage = require('./local-fs') var fs_storage = require('./local-fs')
var Logger = require('./logger') var Logger = require('./logger')
@ -482,35 +484,55 @@ Storage.prototype.get_package = function(name, options, callback) {
}) })
} }
Storage.prototype.get_recent_packages = function(startkey, callback) { // walks through each package and calls `on_package` on them
Storage.prototype._each_package = function (on_package, on_end) {
var self = this var self = this
var i = 0 var storages = {}
var list = []
var storage = self.storage('') storages[self.config.storage] = true
if (!storage) return callback(null, [])
fs.readdir(storage.path, function(err, files) { if (self.config.packages) {
if (err) return callback(null, []) Object.keys(self.packages || {}).map(function (pkg) {
if (self.config.packages[pkg].storage) {
var filesL = files.length storages[self.config.packages[pkg].storage] = true
}
files.forEach(function(file) {
fs.stat(storage.path, function(err, stats) {
if (err) return callback(err)
if (stats.mtime > startkey && Utils.validate_name(file)) {
list.push({
time: stats.mtime,
name: file
})
}
if (++i !== filesL) {
return false
}
return callback(null, list)
})
}) })
}) }
var base = Path.dirname(self.config.self_path);
async.eachSeries(Object.keys(storages), function (storage, cb) {
fs.readdir(Path.resolve(base, storage), function (err, files) {
if (err) return cb(err)
async.eachSeries(files, function (file, cb) {
if (file.match(/^@/)) {
// scoped
fs.readdir(Path.resolve(base, storage, file), function (err, files) {
if (err) return cb(err)
async.eachSeries(files, function (file2, cb) {
if (Utils.validate_name(file2)) {
on_package({
name: file + '/' + file2,
path: Path.resolve(base, storage, file, file2),
}, cb)
} else {
cb()
}
}, cb)
})
} else if (Utils.validate_name(file)) {
on_package({
name: file,
path: Path.resolve(base, storage, file)
}, cb)
} else {
cb()
}
}, cb)
})
}, on_end)
} }
// //
@ -565,6 +587,55 @@ Storage.prototype.update_package = function(name, updateFn, _callback) {
}) })
} }
Storage.prototype.search = function(startkey, options) {
var self = this
var stream = new Stream.PassThrough({ objectMode: true })
self._each_package(function on_package(item, cb) {
fs.stat(item.path, function(err, stats) {
if (err) return cb(err)
if (stats.mtime > startkey) {
self.get_package(item.name, options, function(err, data) {
if (err) return cb(err)
var versions = Utils.semver_sort(Object.keys(data.versions))
var latest = versions[versions.length - 1]
if (data.versions[latest]) {
stream.push({
name : data.versions[latest].name,
description : data.versions[latest].description,
'dist-tags' : { latest: latest },
maintainers : data.versions[latest].maintainers ||
[ data.versions[latest]._npmUser ].filter(Boolean),
author : data.versions[latest].author,
repository : data.versions[latest].repository,
readmeFilename : data.versions[latest].readmeFilename || '',
homepage : data.versions[latest].homepage,
keywords : data.versions[latest].keywords,
bugs : data.versions[latest].bugs,
license : data.versions[latest].license,
time : { modified: item.time ? new Date(item.time).toISOString() : undefined },
versions : {},
})
}
cb()
})
} else {
cb()
}
})
}, function on_end(err) {
if (err) return stream.emit('error', err)
stream.end()
})
return stream
}
Storage.prototype._normalize_package = function(pkg) { Storage.prototype._normalize_package = function(pkg) {
;['versions', 'dist-tags', '_distfiles', '_attachments', '_uplinks'].forEach(function(key) { ;['versions', 'dist-tags', '_distfiles', '_attachments', '_uplinks'].forEach(function(key) {
if (!Utils.is_object(pkg[key])) pkg[key] = {} if (!Utils.is_object(pkg[key])) pkg[key] = {}

View file

@ -1,6 +1,7 @@
var assert = require('assert') var assert = require('assert')
var async = require('async') var async = require('async')
var Error = require('http-errors') var Error = require('http-errors')
var Stream = require('stream')
var Local = require('./local-storage') var Local = require('./local-storage')
var Logger = require('./logger') var Logger = require('./logger')
var MyStreams = require('./streams') var MyStreams = require('./streams')
@ -328,86 +329,49 @@ Storage.prototype.get_package = function(name, options, callback) {
// //
// Retrieve remote and local packages more recent than {startkey} // Retrieve remote and local packages more recent than {startkey}
// //
// Function invokes uplink.request for npm and local.get_recent_packages for // Function streams all packages from all uplinks first, and then
// local ones then sum up the result in a json object // local packages.
//
// Note that local packages could override registry ones just because
// they appear in JSON last. That's a trade-off we make to avoid
// memory issues.
// //
// Used storages: local && uplink (proxy_access) // Used storages: local && uplink (proxy_access)
// //
Storage.prototype.search = function(startkey, options, callback) { Storage.prototype.search = function(startkey, options) {
var self = this var self = this
var uplinks = []
var i = 0
var uplinks var stream = new Stream.PassThrough({ objectMode: true })
for (var p in self.uplinks) {
uplinks.push(p)
}
function merge_with_local_packages(err, res, body) { async.eachSeries(Object.keys(self.uplinks), function(up_name, cb) {
if (err) return callback(err) // shortcut: if `local=1` is supplied, don't call uplinks
var j = 0 if (options.req.query.local !== undefined) return cb()
self.local.get_recent_packages(startkey, function(err, list) { var lstream = self.uplinks[up_name].search(startkey, options)
if (err) return callback(err) lstream.pipe(stream, { end: false })
lstream.on('error', function (err) {
var listL = list.length self.logger.error({ err: err }, 'uplink error: @{err.message}')
cb(), cb = function () {}
if (!listL) return callback(null, body) })
lstream.on('end', function () {
list.forEach(function(item) { cb(), cb = function () {}
self.local.get_package(item.name, options, function(err, data) {
if (err) return callback(err)
var versions = Utils.semver_sort(Object.keys(data.versions))
var latest = versions[versions.length - 1]
if (data.versions[latest]) {
body[item.name] = {
name : data.versions[latest].name,
description : data.versions[latest].description,
'dist-tags' : { latest: latest },
maintainers : data.versions[latest].maintainers || [data.versions[latest]._npmUser].filter(Boolean),
readmeFilename: data.versions[latest].readmeFilename || '',
time : {
modified: new Date(item.time).toISOString()
},
versions : {},
repository : data.versions[latest].repository,
keywords : data.versions[latest].keywords
}
body[item.name].versions[latest] = 'latest'
}
if (++j !== listL) {
return false
}
return callback(null, body)
})
})
}) })
}
function remote_search() { stream.abort = function () {
var uplink = self.uplinks[uplinks[i]] if (lstream.abort) lstream.abort()
if (options.req.query.local !== undefined || !uplink) { cb(), cb = function () {}
return merge_with_local_packages(null, null, {})
} }
self.uplinks[uplinks[i]].request({ }, function () {
uri : options.req.url, var lstream = self.local.search(startkey, options)
timeout : self.uplinks[p].timeout, stream.abort = function () { lstream.abort() }
json : true, lstream.pipe(stream, { end: true })
req : options.req, lstream.on('error', function (err) {
}, function(err, res, body) { self.logger.error({ err: err }, 'search error: @{err.message}')
if (err || Math.floor(res.statusCode / 100) > 3) { stream.end()
i++
return remote_search()
}
return merge_with_local_packages(err, res, body)
}) })
} })
remote_search() return stream
} }
Storage.prototype.get_local = function(callback) { Storage.prototype.get_local = function(callback) {

View file

@ -1,6 +1,7 @@
var JSONStream = require('JSONStream')
var Error = require('http-errors') var Error = require('http-errors')
var request = require('request') var request = require('request')
var Stream = require('stream') var Stream = require('readable-stream')
var URL = require('url') var URL = require('url')
var parse_interval = require('./config').parse_interval var parse_interval = require('./config').parse_interval
var Logger = require('./logger') var Logger = require('./logger')
@ -320,6 +321,44 @@ Storage.prototype.get_url = function(url) {
return stream return stream
} }
Storage.prototype.search = function(startkey, options) {
var self = this
var stream = new Stream.PassThrough({ objectMode: true })
var req = self.request({
uri: options.req.url,
req: options.req,
})
req.on('response', function (res) {
if (!String(res.statusCode).match(/^2\d\d$/)) {
return stream.emit('error', Error('bad status code ' + res.statusCode + ' from uplink'))
}
res.pipe(JSONStream.parse('*')).on('data', function (pkg) {
if (Utils.is_object(pkg)) {
stream.emit('data', pkg)
}
})
res.on('end', function () {
stream.emit('end')
})
})
req.on('error', function (err) {
stream.emit('error', err)
})
stream.abort = function () {
req.abort()
stream.emit('end')
}
return stream
}
Storage.prototype._add_proxy_headers = function(req, headers) { Storage.prototype._add_proxy_headers = function(req, headers) {
if (req) { if (req) {
headers['X-Forwarded-For'] = ( headers['X-Forwarded-For'] = (

View file

@ -39,11 +39,15 @@ dependencies:
lunr: '>=0.5.2 <1.0.0-0' lunr: '>=0.5.2 <1.0.0-0'
render-readme: '>=0.2.1' render-readme: '>=0.2.1'
jju: '1.x' jju: '1.x'
JSONStream: '1.x'
mkdirp: '>=0.3.5 <1.0.0-0' mkdirp: '>=0.3.5 <1.0.0-0'
sinopia-htpasswd: '>= 0.4.3' sinopia-htpasswd: '>= 0.4.3'
http-errors: '>=1.2.0' http-errors: '>=1.2.0'
# node 0.10 compatibility, should go away soon
readable-stream: '~1.1.0'
optionalDependencies: optionalDependencies:
# those are native modules that could fail to compile # those are native modules that could fail to compile
# and unavailable on windows # and unavailable on windows

View file

@ -89,11 +89,13 @@ module.exports = function() {
assert.deepEqual(obj['testpkg-newnpmreg'], assert.deepEqual(obj['testpkg-newnpmreg'],
{ name: 'testpkg-newnpmreg', { name: 'testpkg-newnpmreg',
description: '', description: '',
author: '',
license: 'ISC',
'dist-tags': { latest: '0.0.0' }, 'dist-tags': { latest: '0.0.0' },
maintainers: [ { name: 'alex', email: 'alex@kocharin.ru' } ], maintainers: [ { name: 'alex', email: 'alex@kocharin.ru' } ],
readmeFilename: '', readmeFilename: '',
time: { modified: '2014-10-02T07:07:51.000Z' }, time: { modified: '2014-10-02T07:07:51.000Z' },
versions: { '0.0.0': 'latest' }, versions: {},
repository: { type: 'git', url: '' } }) repository: { type: 'git', url: '' } })
} }