From 8fe23d3393c85bd28d63c11ee108f312c31c86c0 Mon Sep 17 00:00:00 2001 From: Alex Kocharin Date: Fri, 27 Sep 2013 12:56:13 +0400 Subject: [PATCH] moving readable stream interfaces from through to streams2 --- lib/local-fs.js | 4 ++++ lib/local-storage.js | 19 ++++++++++++------- lib/storage.js | 36 ++++++++++++++++++------------------ lib/up-storage.js | 10 ++++------ 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/lib/local-fs.js b/lib/local-fs.js index 65b933798..5e75bc478 100644 --- a/lib/local-fs.js +++ b/lib/local-fs.js @@ -62,6 +62,10 @@ function write_stream(name) { }); file.destroySoon(); }); + file.on('open', function() { + // re-emitting open because it's handled in storage.js + stream.emit('open'); + }); stream.on('abort', function() { file.on('close', function() { fs.unlink(tmpname); diff --git a/lib/local-storage.js b/lib/local-storage.js index 2273d4283..98992ac87 100644 --- a/lib/local-storage.js +++ b/lib/local-storage.js @@ -1,10 +1,11 @@ var fs = require('fs'); var semver = require('semver'); -var through = require('through'); var Path = require('path'); +var through = require('through'); var fs_storage = require('./local-fs'); var UError = require('./error').UserError; var utils = require('./utils'); +var mystreams = require('./streams'); var info_file = 'package.json'; // @@ -15,7 +16,7 @@ function Storage(config) { if (!(this instanceof Storage)) return new Storage(config); this.config = config; var path = Path.resolve(Path.dirname(this.config.self_path), this.config.storage); - this.storage = new fs_storage(this.config.storage); + this.storage = new fs_storage(path); return this; } @@ -162,6 +163,10 @@ Storage.prototype.add_tarball = function(name, filename) { } }); + wstream.on('open', function() { + // re-emitting open because it's handled in storage.js + stream.emit('open'); + }); wstream.on('close', function() { stream.emit('end'); }); @@ -173,11 +178,10 @@ Storage.prototype.add_tarball = function(name, filename) { } Storage.prototype.get_tarball = function(name, filename, callback) { - var stream = through(function(data) { - this.queue(data); - }, function() { - this.queue(null); - }); + var stream = new mystreams.ReadTarballStream(); + stream.abort = function() { + rstream.close(); + }; var rstream = this.storage.read_stream(name + '/' + filename); rstream.on('error', function(err) { @@ -191,6 +195,7 @@ Storage.prototype.get_tarball = function(name, filename, callback) { } }); rstream.on('open', function() { + // re-emitting open because it's handled in storage.js stream.emit('open'); rstream.pipe(stream); }); diff --git a/lib/storage.js b/lib/storage.js index b2f46327f..9cea86fe4 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -4,6 +4,7 @@ var through = require('through'); var UError = require('./error').UserError; var Local = require('./local-storage'); var Proxy = require('./up-storage'); +var mystreams = require('./streams'); var utils = require('./utils'); // @@ -98,11 +99,8 @@ Storage.prototype.add_tarball = function(name, filename) { // Used storages: local || uplink (just one) // Storage.prototype.get_tarball = function(name, filename) { - var stream = through(function(data) { - this.queue(data); - }, function() { - this.queue(null); - }); + var stream = new mystreams.ReadTarballStream(); + stream.abort = function() {}; var self = this; @@ -116,11 +114,12 @@ Storage.prototype.get_tarball = function(name, filename) { if (is_open || err.status !== 404) { return stream.emit('error', err); } - + // local reported 404 var err404 = err; var uplink = null; - rstream.destroy(); + rstream.abort(); + rstream = null; // gc self.local.get_package(name, function(err, info) { if (err) return stream.emit('error', err); @@ -145,20 +144,21 @@ Storage.prototype.get_tarball = function(name, filename) { var savestream = self.local.add_tarball(name, filename); savestream.on('error', function(err) { +console.log('xx!', name, filename); + savestream.abort(); stream.emit('error', err); }); +console.log('!', name, filename); + savestream.on('open', function() { +console.log('x!', name, filename); + var rstream2 = uplink.get_url(file.url); + rstream2.on('error', function(err) { + stream.emit('error', err); + }); - var rstream2 = uplink.get_url(file.url); - rstream2.on('error', function(err) { - stream.emit('error', err); - }); - rstream2.on('data', function(data) { - stream.write(data); - savestream.write(data); - }); - rstream2.on('end', function() { - stream.end(); - savestream.end(); + // XXX: check, what would happen if client disconnects? + rstream2.pipe(stream); + rstream2.pipe(savestream); }); }); }); diff --git a/lib/up-storage.js b/lib/up-storage.js index f66344433..957e604a2 100644 --- a/lib/up-storage.js +++ b/lib/up-storage.js @@ -1,7 +1,8 @@ +var URL = require('url'); var request = require('request'); var through = require('through'); var UError = require('./error').UserError; -var URL = require('url'); +var mystreams = require('./streams'); // // Implements Storage interface @@ -77,11 +78,8 @@ Storage.prototype.get_tarball = function(name, filename) { } Storage.prototype.get_url = function(url) { - var stream = through(function(data) { - this.queue(data); - }, function() { - this.queue(null); - }); + var stream = new mystreams.ReadTarballStream(); + stream.abort = function() {}; var rstream = request({ url: url,