0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2025-03-18 02:22:46 -05:00

Refactor streams, removed not needed dependency

This commit is contained in:
Juan Picado @jotadeveloper 2017-05-26 07:21:30 +02:00
parent e9929c23e1
commit 1307181005
No known key found for this signature in database
GPG key ID: 18AC54485952D158
7 changed files with 93 additions and 92 deletions

View file

@ -64,7 +64,7 @@ const write = function(dest, data, cb) {
};
const write_stream = function(name) {
const stream = MyStreams.uploadTarballStream();
const stream = new MyStreams.UploadTarball();
let _ended = 0;
stream.on('end', function() {
_ended = 1;
@ -134,7 +134,7 @@ const read_stream = function(name, stream, callback) {
});
});
stream = MyStreams.readTarballStream();
stream = new MyStreams.ReadTarball();
stream.abort = function() {
rstream.close();
};

View file

@ -8,7 +8,7 @@ const Crypto = require('crypto');
const fs = require('fs');
const Error = require('http-errors');
const Path = require('path');
const Stream = require('readable-stream');
const Stream = require('stream');
const URL = require('url');
const fs_storage = require('./local-fs');
@ -380,7 +380,7 @@ class Storage {
addTarball(name, filename) {
assert(Utils.validate_name(filename));
let stream = MyStreams.uploadTarballStream();
let stream = new MyStreams.UploadTarball();
let _transform = stream._transform;
let length = 0;
let shasum = Crypto.createHash('sha1');
@ -470,7 +470,7 @@ class Storage {
*/
getTarball(name, filename, callback) {
assert(Utils.validate_name(filename));
const stream = MyStreams.readTarballStream();
const stream = new MyStreams.ReadTarball();
stream.abort = function() {
if (rstream) {
rstream.abort();
@ -534,65 +534,6 @@ class Storage {
});
}
/**
* This function allows to update the package thread-safely
Algorithm:
1. lock package.json for writing
2. read package.json
3. updateFn(pkg, cb), and wait for cb
4. write package.json.tmp
5. move package.json.tmp package.json
6. callback(err?)
* @param {*} name package name
* @param {*} updateFn function(package, cb) - update function
* @param {*} _callback callback that gets invoked after it's all updated
* @return {Function}
*/
_updatePackage(name, updateFn, _callback) {
const storage = this.storage(name);
if (!storage) {
return _callback( Error[404]('no such package available') );
}
storage.lock_and_read_json(info_file, (err, json) => {
let locked = false;
// callback that cleans up lock first
const callback = function(err) {
let _args = arguments;
if (locked) {
storage.unlock_file(info_file, function() {
// ignore any error from the unlock
_callback.apply(err, _args);
});
} else {
_callback.apply(null, _args);
}
};
if (!err) {
locked = true;
}
if (err) {
if (err.code === 'EAGAIN') {
return callback( Error[503]('resource temporarily unavailable') );
} else if (err.code === 'ENOENT') {
return callback( Error[404]('no such package available') );
} else {
return callback(err);
}
}
this._normalizePackage(json);
updateFn(json, (err) => {
if (err) {
return callback(err);
}
this._writePackage(name, json, callback);
});
});
}
/**
* Search a local package.
* @param {*} startKey
@ -789,6 +730,65 @@ class Storage {
return Error[500]();
}
/**
* This function allows to update the package thread-safely
Algorithm:
1. lock package.json for writing
2. read package.json
3. updateFn(pkg, cb), and wait for cb
4. write package.json.tmp
5. move package.json.tmp package.json
6. callback(err?)
* @param {*} name package name
* @param {*} updateFn function(package, cb) - update function
* @param {*} _callback callback that gets invoked after it's all updated
* @return {Function}
*/
_updatePackage(name, updateFn, _callback) {
const storage = this.storage(name);
if (!storage) {
return _callback( Error[404]('no such package available') );
}
storage.lock_and_read_json(info_file, (err, json) => {
let locked = false;
// callback that cleans up lock first
const callback = function(err) {
let _args = arguments;
if (locked) {
storage.unlock_file(info_file, function() {
// ignore any error from the unlock
_callback.apply(err, _args);
});
} else {
_callback.apply(null, _args);
}
};
if (!err) {
locked = true;
}
if (err) {
if (err.code === 'EAGAIN') {
return callback( Error[503]('resource temporarily unavailable') );
} else if (err.code === 'ENOENT') {
return callback( Error[404]('no such package available') );
} else {
return callback(err);
}
}
this._normalizePackage(json);
updateFn(json, (err) => {
if (err) {
return callback(err);
}
this._writePackage(name, json, callback);
});
});
}
/**
* Update the revision (_rev) string for a package.
* @param {*} name

View file

@ -1,12 +1,13 @@
'use strict';
const _ = require('lodash');
const assert = require('assert');
const async = require('async');
const Error = require('http-errors');
const semver = require('semver');
const Crypto = require('crypto');
const _ = require('lodash');
const Stream = require('stream');
const Search = require('./search');
const LocalStorage = require('./local-storage');
const Logger = require('./logger');
@ -240,7 +241,7 @@ class Storage {
* @return {Stream}
*/
get_tarball(name, filename) {
let stream = MyStreams.readTarballStream();
let stream = new MyStreams.ReadTarball();
stream.abort = function() {};
let self = this;

View file

@ -1,45 +1,46 @@
'use strict';
const Stream = require('stream');
const Util = require('util');
/**
* This stream is used to read tarballs from repository.
* @param {*} options
* @return {Object}
*/
function ReadTarball(options) {
const self = new Stream.PassThrough(options);
Object.setPrototypeOf(self, ReadTarball.prototype);
class ReadTarball extends Stream.PassThrough {
// called when data is not needed anymore
add_abstract_method(self, 'abort');
return self;
/**
*
* @param {Object} options
*/
constructor(options) {
super(options);
// called when data is not needed anymore
add_abstract_method(this, 'abort');
}
}
Util.inherits(ReadTarball, Stream.PassThrough);
/**
* This stream is used to upload tarballs to a repository.
* @param {*} options
* @return {Object}
*/
function UploadTarball(options) {
const self = new Stream.PassThrough(options);
Object.setPrototypeOf(self, UploadTarball.prototype);
class UploadTarball extends Stream.PassThrough {
// called when user closes connection before upload finishes
add_abstract_method(self, 'abort');
/**
*
* @param {Object} options
*/
constructor(options) {
super(options);
// called when user closes connection before upload finishes
add_abstract_method(this, 'abort');
// called when upload finishes successfully
add_abstract_method(self, 'done');
return self;
// called when upload finishes successfully
add_abstract_method(this, 'done');
}
}
Util.inherits(UploadTarball, Stream.PassThrough);
/**
* This function intercepts abstract calls and replays them allowing.
* us to attach those functions after we are ready to do so
@ -63,5 +64,5 @@ function add_abstract_method(self, name) {
});
}
module.exports.readTarballStream = ReadTarball;
module.exports.uploadTarballStream = UploadTarball;
module.exports.ReadTarball = ReadTarball;
module.exports.UploadTarball = UploadTarball;

View file

@ -356,7 +356,7 @@ class Storage {
* @return {Stream}
*/
get_url(url) {
const stream = MyStreams.readTarballStream();
const stream = new MyStreams.ReadTarball();
stream.abort = function() {};
let current_length = 0;
let expected_length;

View file

@ -36,7 +36,6 @@
"minimatch": "^3.0.2",
"mkdirp": "^0.5.1",
"pkginfo": "^0.4.0",
"readable-stream": "^2.1.2",
"render-readme": "^1.3.1",
"request": "^2.72.0",
"semver": "^5.1.0",

View file

@ -1,6 +1,6 @@
'use strict';
let ReadTarball = require('../../lib/streams').readTarballStream;
let ReadTarball = require('../../lib/streams').ReadTarball;
describe('mystreams', function() {
it('should delay events', function(cb) {