From 45edca22183e82b2a1cc69289096a1529f8dd043 Mon Sep 17 00:00:00 2001 From: Alex Kocharin Date: Sun, 17 May 2015 01:29:16 +0300 Subject: [PATCH] search: stream results from npmjs instead of buffering them --- lib/config.js | 5 ++ lib/index-api.js | 71 +++++++++++++------- lib/index.js | 12 ---- lib/local-storage.js | 121 +++++++++++++++++++++++++++-------- lib/storage.js | 100 +++++++++-------------------- lib/up-storage.js | 41 +++++++++++- package.yaml | 4 ++ test/functional/newnpmreg.js | 4 +- 8 files changed, 227 insertions(+), 131 deletions(-) diff --git a/lib/config.js b/lib/config.js index ba51ff582..e9add9ec7 100644 --- a/lib/config.js +++ b/lib/config.js @@ -100,6 +100,11 @@ function Config(config) { return flatten(result) } + // add a default rule for all packages to make writing plugins easier + if (self.packages['**'] == null) { + self.packages['**'] = {} + } + for (var i in self.packages) { assert( typeof(self.packages[i]) === 'object' && diff --git a/lib/index-api.js b/lib/index-api.js index 58aa50ef0..1aa90cd6e 100644 --- a/lib/index-api.js +++ b/lib/index-api.js @@ -1,4 +1,3 @@ -var async = require('async') var Cookies = require('cookies') var express = require('express') var expressJson5 = require('express-json5') @@ -84,33 +83,57 @@ module.exports = function(config, auth, storage) { // searching packages app.get('/-/all/:anything?', function(req, res, next) { - storage.search(req.param.startkey || 0, {req: req}, function(err, result) { - if (err) return next(err) - async.eachSeries(Object.keys(result), function(pkg, cb) { - auth.allow_access(pkg, req.remote_user, function(err, allowed) { - if (err) { - if (err.status && String(err.status).match(/^4\d\d$/)) { - // auth plugin returns 4xx user error, - // that's equivalent of !allowed basically - allowed = false - } else { - return cb(err) - } - } + var received_end = false + var response_finished = false + var processing_pkgs = 0 - if (!allowed) delete result[pkg] - cb() - }) - }, function(err) { - if (err) return next(err) - next(result) + res.status(200) + res.write('{"_updated":' + Date.now()); + + var stream = storage.search(req.param.startkey || 0, { req: req }) + + stream.on('data', function each(pkg) { + processing_pkgs++ + + auth.allow_access(pkg.name, req.remote_user, function(err, allowed) { + processing_pkgs-- + + if (err) { + if (err.status && String(err.status).match(/^4\d\d$/)) { + // auth plugin returns 4xx user error, + // that's equivalent of !allowed basically + allowed = false + } else { + stream.abort(err) + } + } + + if (allowed) { + res.write(',\n' + JSON.stringify(pkg.name) + ':' + JSON.stringify(pkg)) + } + + check_finish() }) }) - }) - //app.get('/*', function(req, res) { - // proxy.request(req, res) - //}) + stream.on('error', function (_err) { + res.socket.destroy() + }) + + stream.on('end', function () { + received_end = true + check_finish() + }) + + function check_finish() { + if (!received_end) return + if (processing_pkgs) return + if (response_finished) return + + response_finished = true + res.end('}\n') + } + }) // placeholder 'cause npm require to be authenticated to publish // we do not do any real authentication yet diff --git a/lib/index.js b/lib/index.js index 1e21f1fa6..8e9c8ecc8 100644 --- a/lib/index.js +++ b/lib/index.js @@ -58,18 +58,6 @@ module.exports = function(config_hash) { next() }) -/* app.get('/-/all', function(req, res) { - var https = require('https') - var JSONStream = require('JSONStream') - var request = require('request')({ - url: 'https://registry.npmjs.org/-/all', - }) - .pipe(JSONStream.parse('*')) - .on('data', function(d) { - console.log(d) - }) - })*/ - // hook for tests only if (config._debug) { app.get('/-/_debug', function(req, res, next) { diff --git a/lib/local-storage.js b/lib/local-storage.js index cf34aa9ff..52a03210d 100644 --- a/lib/local-storage.js +++ b/lib/local-storage.js @@ -1,8 +1,10 @@ var assert = require('assert') +var async = require('async') var Crypto = require('crypto') var fs = require('fs') var Error = require('http-errors') var Path = require('path') +var Stream = require('readable-stream') var URL = require('url') var fs_storage = require('./local-fs') var Logger = require('./logger') @@ -482,35 +484,55 @@ Storage.prototype.get_package = function(name, options, callback) { }) } -Storage.prototype.get_recent_packages = function(startkey, callback) { +// walks through each package and calls `on_package` on them +Storage.prototype._each_package = function (on_package, on_end) { var self = this - var i = 0 - var list = [] + var storages = {} - var storage = self.storage('') - if (!storage) return callback(null, []) + storages[self.config.storage] = true - fs.readdir(storage.path, function(err, files) { - if (err) return callback(null, []) - - var filesL = files.length - - files.forEach(function(file) { - fs.stat(storage.path, function(err, stats) { - if (err) return callback(err) - if (stats.mtime > startkey && Utils.validate_name(file)) { - list.push({ - time: stats.mtime, - name: file - }) - } - if (++i !== filesL) { - return false - } - return callback(null, list) - }) + if (self.config.packages) { + Object.keys(self.packages || {}).map(function (pkg) { + if (self.config.packages[pkg].storage) { + storages[self.config.packages[pkg].storage] = true + } }) - }) + } + + var base = Path.dirname(self.config.self_path); + + async.eachSeries(Object.keys(storages), function (storage, cb) { + fs.readdir(Path.resolve(base, storage), function (err, files) { + if (err) return cb(err) + + async.eachSeries(files, function (file, cb) { + if (file.match(/^@/)) { + // scoped + fs.readdir(Path.resolve(base, storage, file), function (err, files) { + if (err) return cb(err) + + async.eachSeries(files, function (file2, cb) { + if (Utils.validate_name(file2)) { + on_package({ + name: file + '/' + file2, + path: Path.resolve(base, storage, file, file2), + }, cb) + } else { + cb() + } + }, cb) + }) + } else if (Utils.validate_name(file)) { + on_package({ + name: file, + path: Path.resolve(base, storage, file) + }, cb) + } else { + cb() + } + }, cb) + }) + }, on_end) } // @@ -565,6 +587,55 @@ Storage.prototype.update_package = function(name, updateFn, _callback) { }) } +Storage.prototype.search = function(startkey, options) { + var self = this + + var stream = new Stream.PassThrough({ objectMode: true }) + + self._each_package(function on_package(item, cb) { + fs.stat(item.path, function(err, stats) { + if (err) return cb(err) + + if (stats.mtime > startkey) { + self.get_package(item.name, options, function(err, data) { + if (err) return cb(err) + + var versions = Utils.semver_sort(Object.keys(data.versions)) + var latest = versions[versions.length - 1] + + if (data.versions[latest]) { + stream.push({ + name : data.versions[latest].name, + description : data.versions[latest].description, + 'dist-tags' : { latest: latest }, + maintainers : data.versions[latest].maintainers || + [ data.versions[latest]._npmUser ].filter(Boolean), + author : data.versions[latest].author, + repository : data.versions[latest].repository, + readmeFilename : data.versions[latest].readmeFilename || '', + homepage : data.versions[latest].homepage, + keywords : data.versions[latest].keywords, + bugs : data.versions[latest].bugs, + license : data.versions[latest].license, + time : { modified: item.time ? new Date(item.time).toISOString() : undefined }, + versions : {}, + }) + } + + cb() + }) + } else { + cb() + } + }) + }, function on_end(err) { + if (err) return stream.emit('error', err) + stream.end() + }) + + return stream +} + Storage.prototype._normalize_package = function(pkg) { ;['versions', 'dist-tags', '_distfiles', '_attachments', '_uplinks'].forEach(function(key) { if (!Utils.is_object(pkg[key])) pkg[key] = {} diff --git a/lib/storage.js b/lib/storage.js index a099a0196..3daace978 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -1,6 +1,7 @@ var assert = require('assert') var async = require('async') var Error = require('http-errors') +var Stream = require('stream') var Local = require('./local-storage') var Logger = require('./logger') var MyStreams = require('./streams') @@ -328,86 +329,49 @@ Storage.prototype.get_package = function(name, options, callback) { // // Retrieve remote and local packages more recent than {startkey} // -// Function invokes uplink.request for npm and local.get_recent_packages for -// local ones then sum up the result in a json object +// Function streams all packages from all uplinks first, and then +// local packages. +// +// Note that local packages could override registry ones just because +// they appear in JSON last. That's a trade-off we make to avoid +// memory issues. // // Used storages: local && uplink (proxy_access) // -Storage.prototype.search = function(startkey, options, callback) { +Storage.prototype.search = function(startkey, options) { var self = this - var uplinks = [] - var i = 0 - var uplinks - for (var p in self.uplinks) { - uplinks.push(p) - } + var stream = new Stream.PassThrough({ objectMode: true }) - function merge_with_local_packages(err, res, body) { - if (err) return callback(err) - var j = 0 + async.eachSeries(Object.keys(self.uplinks), function(up_name, cb) { + // shortcut: if `local=1` is supplied, don't call uplinks + if (options.req.query.local !== undefined) return cb() - self.local.get_recent_packages(startkey, function(err, list) { - if (err) return callback(err) - - var listL = list.length - - if (!listL) return callback(null, body) - - list.forEach(function(item) { - self.local.get_package(item.name, options, function(err, data) { - if (err) return callback(err) - - var versions = Utils.semver_sort(Object.keys(data.versions)) - var latest = versions[versions.length - 1] - - if (data.versions[latest]) { - body[item.name] = { - name : data.versions[latest].name, - description : data.versions[latest].description, - 'dist-tags' : { latest: latest }, - maintainers : data.versions[latest].maintainers || [data.versions[latest]._npmUser].filter(Boolean), - readmeFilename: data.versions[latest].readmeFilename || '', - time : { - modified: new Date(item.time).toISOString() - }, - versions : {}, - repository : data.versions[latest].repository, - keywords : data.versions[latest].keywords - } - body[item.name].versions[latest] = 'latest' - } - - if (++j !== listL) { - return false - } - - return callback(null, body) - }) - }) + var lstream = self.uplinks[up_name].search(startkey, options) + lstream.pipe(stream, { end: false }) + lstream.on('error', function (err) { + self.logger.error({ err: err }, 'uplink error: @{err.message}') + cb(), cb = function () {} + }) + lstream.on('end', function () { + cb(), cb = function () {} }) - } - function remote_search() { - var uplink = self.uplinks[uplinks[i]] - if (options.req.query.local !== undefined || !uplink) { - return merge_with_local_packages(null, null, {}) + stream.abort = function () { + if (lstream.abort) lstream.abort() + cb(), cb = function () {} } - self.uplinks[uplinks[i]].request({ - uri : options.req.url, - timeout : self.uplinks[p].timeout, - json : true, - req : options.req, - }, function(err, res, body) { - if (err || Math.floor(res.statusCode / 100) > 3) { - i++ - return remote_search() - } - return merge_with_local_packages(err, res, body) + }, function () { + var lstream = self.local.search(startkey, options) + stream.abort = function () { lstream.abort() } + lstream.pipe(stream, { end: true }) + lstream.on('error', function (err) { + self.logger.error({ err: err }, 'search error: @{err.message}') + stream.end() }) - } + }) - remote_search() + return stream } Storage.prototype.get_local = function(callback) { diff --git a/lib/up-storage.js b/lib/up-storage.js index 5993f2f1b..daf6ed6d2 100644 --- a/lib/up-storage.js +++ b/lib/up-storage.js @@ -1,6 +1,7 @@ +var JSONStream = require('JSONStream') var Error = require('http-errors') var request = require('request') -var Stream = require('stream') +var Stream = require('readable-stream') var URL = require('url') var parse_interval = require('./config').parse_interval var Logger = require('./logger') @@ -320,6 +321,44 @@ Storage.prototype.get_url = function(url) { return stream } +Storage.prototype.search = function(startkey, options) { + var self = this + + var stream = new Stream.PassThrough({ objectMode: true }) + + var req = self.request({ + uri: options.req.url, + req: options.req, + }) + + req.on('response', function (res) { + if (!String(res.statusCode).match(/^2\d\d$/)) { + return stream.emit('error', Error('bad status code ' + res.statusCode + ' from uplink')) + } + + res.pipe(JSONStream.parse('*')).on('data', function (pkg) { + if (Utils.is_object(pkg)) { + stream.emit('data', pkg) + } + }) + + res.on('end', function () { + stream.emit('end') + }) + }) + + req.on('error', function (err) { + stream.emit('error', err) + }) + + stream.abort = function () { + req.abort() + stream.emit('end') + } + + return stream +} + Storage.prototype._add_proxy_headers = function(req, headers) { if (req) { headers['X-Forwarded-For'] = ( diff --git a/package.yaml b/package.yaml index 517d44b74..cc7ab7ebf 100644 --- a/package.yaml +++ b/package.yaml @@ -39,11 +39,15 @@ dependencies: lunr: '>=0.5.2 <1.0.0-0' render-readme: '>=0.2.1' jju: '1.x' + JSONStream: '1.x' mkdirp: '>=0.3.5 <1.0.0-0' sinopia-htpasswd: '>= 0.4.3' http-errors: '>=1.2.0' + # node 0.10 compatibility, should go away soon + readable-stream: '~1.1.0' + optionalDependencies: # those are native modules that could fail to compile # and unavailable on windows diff --git a/test/functional/newnpmreg.js b/test/functional/newnpmreg.js index 6ec78498b..2565950d0 100644 --- a/test/functional/newnpmreg.js +++ b/test/functional/newnpmreg.js @@ -89,11 +89,13 @@ module.exports = function() { assert.deepEqual(obj['testpkg-newnpmreg'], { name: 'testpkg-newnpmreg', description: '', + author: '', + license: 'ISC', 'dist-tags': { latest: '0.0.0' }, maintainers: [ { name: 'alex', email: 'alex@kocharin.ru' } ], readmeFilename: '', time: { modified: '2014-10-02T07:07:51.000Z' }, - versions: { '0.0.0': 'latest' }, + versions: {}, repository: { type: 'git', url: '' } }) }