diff --git a/lib/local-fs.js b/lib/local-fs.js index 6a5de6f67..b2ca4cd46 100644 --- a/lib/local-fs.js +++ b/lib/local-fs.js @@ -64,7 +64,7 @@ const write = function(dest, data, cb) { }; const write_stream = function(name) { - const stream = MyStreams.uploadTarballStream(); + const stream = new MyStreams.UploadTarball(); let _ended = 0; stream.on('end', function() { _ended = 1; @@ -134,7 +134,7 @@ const read_stream = function(name, stream, callback) { }); }); - stream = MyStreams.readTarballStream(); + stream = new MyStreams.ReadTarball(); stream.abort = function() { rstream.close(); }; diff --git a/lib/local-storage.js b/lib/local-storage.js index 09b82d9c7..a3520e23c 100644 --- a/lib/local-storage.js +++ b/lib/local-storage.js @@ -8,7 +8,7 @@ const Crypto = require('crypto'); const fs = require('fs'); const Error = require('http-errors'); const Path = require('path'); -const Stream = require('readable-stream'); +const Stream = require('stream'); const URL = require('url'); const fs_storage = require('./local-fs'); @@ -380,7 +380,7 @@ class Storage { addTarball(name, filename) { assert(Utils.validate_name(filename)); - let stream = MyStreams.uploadTarballStream(); + let stream = new MyStreams.UploadTarball(); let _transform = stream._transform; let length = 0; let shasum = Crypto.createHash('sha1'); @@ -470,7 +470,7 @@ class Storage { */ getTarball(name, filename, callback) { assert(Utils.validate_name(filename)); - const stream = MyStreams.readTarballStream(); + const stream = new MyStreams.ReadTarball(); stream.abort = function() { if (rstream) { rstream.abort(); @@ -534,65 +534,6 @@ class Storage { }); } - /** - * This function allows to update the package thread-safely - Algorithm: - 1. lock package.json for writing - 2. read package.json - 3. updateFn(pkg, cb), and wait for cb - 4. write package.json.tmp - 5. move package.json.tmp package.json - 6. callback(err?) - * @param {*} name package name - * @param {*} updateFn function(package, cb) - update function - * @param {*} _callback callback that gets invoked after it's all updated - * @return {Function} - */ - _updatePackage(name, updateFn, _callback) { - const storage = this.storage(name); - if (!storage) { - return _callback( Error[404]('no such package available') ); - } - storage.lock_and_read_json(info_file, (err, json) => { - let locked = false; - - // callback that cleans up lock first - const callback = function(err) { - let _args = arguments; - if (locked) { - storage.unlock_file(info_file, function() { - // ignore any error from the unlock - _callback.apply(err, _args); - }); - } else { - _callback.apply(null, _args); - } - }; - - if (!err) { - locked = true; - } - - if (err) { - if (err.code === 'EAGAIN') { - return callback( Error[503]('resource temporarily unavailable') ); - } else if (err.code === 'ENOENT') { - return callback( Error[404]('no such package available') ); - } else { - return callback(err); - } - } - - this._normalizePackage(json); - updateFn(json, (err) => { - if (err) { - return callback(err); - } - this._writePackage(name, json, callback); - }); - }); - } - /** * Search a local package. * @param {*} startKey @@ -789,6 +730,65 @@ class Storage { return Error[500](); } + /** + * This function allows to update the package thread-safely + Algorithm: + 1. lock package.json for writing + 2. read package.json + 3. updateFn(pkg, cb), and wait for cb + 4. write package.json.tmp + 5. move package.json.tmp package.json + 6. callback(err?) + * @param {*} name package name + * @param {*} updateFn function(package, cb) - update function + * @param {*} _callback callback that gets invoked after it's all updated + * @return {Function} + */ + _updatePackage(name, updateFn, _callback) { + const storage = this.storage(name); + if (!storage) { + return _callback( Error[404]('no such package available') ); + } + storage.lock_and_read_json(info_file, (err, json) => { + let locked = false; + + // callback that cleans up lock first + const callback = function(err) { + let _args = arguments; + if (locked) { + storage.unlock_file(info_file, function() { + // ignore any error from the unlock + _callback.apply(err, _args); + }); + } else { + _callback.apply(null, _args); + } + }; + + if (!err) { + locked = true; + } + + if (err) { + if (err.code === 'EAGAIN') { + return callback( Error[503]('resource temporarily unavailable') ); + } else if (err.code === 'ENOENT') { + return callback( Error[404]('no such package available') ); + } else { + return callback(err); + } + } + + this._normalizePackage(json); + updateFn(json, (err) => { + if (err) { + return callback(err); + } + this._writePackage(name, json, callback); + }); + }); + } + /** * Update the revision (_rev) string for a package. * @param {*} name diff --git a/lib/storage.js b/lib/storage.js index 0584fd5df..e339aa2c7 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -1,12 +1,13 @@ 'use strict'; +const _ = require('lodash'); const assert = require('assert'); const async = require('async'); const Error = require('http-errors'); const semver = require('semver'); const Crypto = require('crypto'); -const _ = require('lodash'); const Stream = require('stream'); + const Search = require('./search'); const LocalStorage = require('./local-storage'); const Logger = require('./logger'); @@ -240,7 +241,7 @@ class Storage { * @return {Stream} */ get_tarball(name, filename) { - let stream = MyStreams.readTarballStream(); + let stream = new MyStreams.ReadTarball(); stream.abort = function() {}; let self = this; diff --git a/lib/streams.js b/lib/streams.js index b8ef0fc13..dcb50a3dd 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -1,45 +1,46 @@ 'use strict'; const Stream = require('stream'); -const Util = require('util'); /** * This stream is used to read tarballs from repository. * @param {*} options * @return {Object} */ -function ReadTarball(options) { - const self = new Stream.PassThrough(options); - Object.setPrototypeOf(self, ReadTarball.prototype); +class ReadTarball extends Stream.PassThrough { - // called when data is not needed anymore - add_abstract_method(self, 'abort'); - - return self; + /** + * + * @param {Object} options + */ + constructor(options) { + super(options); + // called when data is not needed anymore + add_abstract_method(this, 'abort'); + } } -Util.inherits(ReadTarball, Stream.PassThrough); - /** * This stream is used to upload tarballs to a repository. * @param {*} options * @return {Object} */ -function UploadTarball(options) { - const self = new Stream.PassThrough(options); - Object.setPrototypeOf(self, UploadTarball.prototype); +class UploadTarball extends Stream.PassThrough { - // called when user closes connection before upload finishes - add_abstract_method(self, 'abort'); + /** + * + * @param {Object} options + */ + constructor(options) { + super(options); + // called when user closes connection before upload finishes + add_abstract_method(this, 'abort'); - // called when upload finishes successfully - add_abstract_method(self, 'done'); - - return self; + // called when upload finishes successfully + add_abstract_method(this, 'done'); + } } -Util.inherits(UploadTarball, Stream.PassThrough); - /** * This function intercepts abstract calls and replays them allowing. * us to attach those functions after we are ready to do so @@ -63,5 +64,5 @@ function add_abstract_method(self, name) { }); } -module.exports.readTarballStream = ReadTarball; -module.exports.uploadTarballStream = UploadTarball; +module.exports.ReadTarball = ReadTarball; +module.exports.UploadTarball = UploadTarball; diff --git a/lib/up-storage.js b/lib/up-storage.js index 795318a1d..c208fc489 100644 --- a/lib/up-storage.js +++ b/lib/up-storage.js @@ -356,7 +356,7 @@ class Storage { * @return {Stream} */ get_url(url) { - const stream = MyStreams.readTarballStream(); + const stream = new MyStreams.ReadTarball(); stream.abort = function() {}; let current_length = 0; let expected_length; diff --git a/package.json b/package.json index 3ffdaf4e3..3996a72f4 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,6 @@ "minimatch": "^3.0.2", "mkdirp": "^0.5.1", "pkginfo": "^0.4.0", - "readable-stream": "^2.1.2", "render-readme": "^1.3.1", "request": "^2.72.0", "semver": "^5.1.0", diff --git a/test/unit/mystreams.js b/test/unit/mystreams.js index dfdf910f9..40f51e5dd 100644 --- a/test/unit/mystreams.js +++ b/test/unit/mystreams.js @@ -1,6 +1,6 @@ 'use strict'; -let ReadTarball = require('../../lib/streams').readTarballStream; +let ReadTarball = require('../../lib/streams').ReadTarball; describe('mystreams', function() { it('should delay events', function(cb) {