0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2024-12-30 22:34:10 -05:00

moving readable stream interfaces from through to streams2

This commit is contained in:
Alex Kocharin 2013-09-27 12:56:13 +04:00
parent 9f80a0046e
commit 8fe23d3393
4 changed files with 38 additions and 31 deletions

View file

@ -62,6 +62,10 @@ function write_stream(name) {
});
file.destroySoon();
});
file.on('open', function() {
// re-emitting open because it's handled in storage.js
stream.emit('open');
});
stream.on('abort', function() {
file.on('close', function() {
fs.unlink(tmpname);

View file

@ -1,10 +1,11 @@
var fs = require('fs');
var semver = require('semver');
var through = require('through');
var Path = require('path');
var through = require('through');
var fs_storage = require('./local-fs');
var UError = require('./error').UserError;
var utils = require('./utils');
var mystreams = require('./streams');
var info_file = 'package.json';
//
@ -15,7 +16,7 @@ function Storage(config) {
if (!(this instanceof Storage)) return new Storage(config);
this.config = config;
var path = Path.resolve(Path.dirname(this.config.self_path), this.config.storage);
this.storage = new fs_storage(this.config.storage);
this.storage = new fs_storage(path);
return this;
}
@ -162,6 +163,10 @@ Storage.prototype.add_tarball = function(name, filename) {
}
});
wstream.on('open', function() {
// re-emitting open because it's handled in storage.js
stream.emit('open');
});
wstream.on('close', function() {
stream.emit('end');
});
@ -173,11 +178,10 @@ Storage.prototype.add_tarball = function(name, filename) {
}
Storage.prototype.get_tarball = function(name, filename, callback) {
var stream = through(function(data) {
this.queue(data);
}, function() {
this.queue(null);
});
var stream = new mystreams.ReadTarballStream();
stream.abort = function() {
rstream.close();
};
var rstream = this.storage.read_stream(name + '/' + filename);
rstream.on('error', function(err) {
@ -191,6 +195,7 @@ Storage.prototype.get_tarball = function(name, filename, callback) {
}
});
rstream.on('open', function() {
// re-emitting open because it's handled in storage.js
stream.emit('open');
rstream.pipe(stream);
});

View file

@ -4,6 +4,7 @@ var through = require('through');
var UError = require('./error').UserError;
var Local = require('./local-storage');
var Proxy = require('./up-storage');
var mystreams = require('./streams');
var utils = require('./utils');
//
@ -98,11 +99,8 @@ Storage.prototype.add_tarball = function(name, filename) {
// Used storages: local || uplink (just one)
//
Storage.prototype.get_tarball = function(name, filename) {
var stream = through(function(data) {
this.queue(data);
}, function() {
this.queue(null);
});
var stream = new mystreams.ReadTarballStream();
stream.abort = function() {};
var self = this;
@ -116,11 +114,12 @@ Storage.prototype.get_tarball = function(name, filename) {
if (is_open || err.status !== 404) {
return stream.emit('error', err);
}
// local reported 404
var err404 = err;
var uplink = null;
rstream.destroy();
rstream.abort();
rstream = null; // gc
self.local.get_package(name, function(err, info) {
if (err) return stream.emit('error', err);
@ -145,20 +144,21 @@ Storage.prototype.get_tarball = function(name, filename) {
var savestream = self.local.add_tarball(name, filename);
savestream.on('error', function(err) {
console.log('xx!', name, filename);
savestream.abort();
stream.emit('error', err);
});
console.log('!', name, filename);
savestream.on('open', function() {
console.log('x!', name, filename);
var rstream2 = uplink.get_url(file.url);
rstream2.on('error', function(err) {
stream.emit('error', err);
});
var rstream2 = uplink.get_url(file.url);
rstream2.on('error', function(err) {
stream.emit('error', err);
});
rstream2.on('data', function(data) {
stream.write(data);
savestream.write(data);
});
rstream2.on('end', function() {
stream.end();
savestream.end();
// XXX: check, what would happen if client disconnects?
rstream2.pipe(stream);
rstream2.pipe(savestream);
});
});
});

View file

@ -1,7 +1,8 @@
var URL = require('url');
var request = require('request');
var through = require('through');
var UError = require('./error').UserError;
var URL = require('url');
var mystreams = require('./streams');
//
// Implements Storage interface
@ -77,11 +78,8 @@ Storage.prototype.get_tarball = function(name, filename) {
}
Storage.prototype.get_url = function(url) {
var stream = through(function(data) {
this.queue(data);
}, function() {
this.queue(null);
});
var stream = new mystreams.ReadTarballStream();
stream.abort = function() {};
var rstream = request({
url: url,