0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2025-01-20 22:52:46 -05:00

remove all replication-like functionality

apparently it was a bad idea, it's simpler to just run a single
sinopia instance as a master

TODO: write some help in readme about it
This commit is contained in:
Alex Kocharin 2013-12-27 17:23:14 +04:00
parent f3f4fdc4ac
commit 6c838c7947
6 changed files with 28 additions and 443 deletions

View file

@ -5,7 +5,6 @@ var async = require('async')
, Proxy = require('./up-storage') , Proxy = require('./up-storage')
, mystreams = require('./streams') , mystreams = require('./streams')
, utils = require('./utils') , utils = require('./utils')
, transaction = require('./transaction')
, Logger = require('./logger') , Logger = require('./logger')
// //
@ -34,16 +33,9 @@ function Storage(config) {
// Add a {name} package to a system // Add a {name} package to a system
// //
// Function checks if package with the same name is available from uplinks. // Function checks if package with the same name is available from uplinks.
// If it isn't, we create package metadata locally and send requests to do // If it isn't, we create package locally
// the same to all uplinks with write access. If all actions succeeded, we
// report success, if just one uplink fails, we abort.
// //
// TODO: if a package is uploaded to uplink1, but upload to uplink2 fails, // Used storages: local (write) && uplinks
// we report failure, but package is not removed from uplink1. This might
// require manual intervention.
//
// Used storages: local (write) && uplinks (proxy_access, r/o) &&
// uplinks (proxy_publish, write)
// //
Storage.prototype.add_package = function(name, metadata, callback) { Storage.prototype.add_package = function(name, metadata, callback) {
var self = this var self = this
@ -112,106 +104,26 @@ Storage.prototype.add_package = function(name, metadata, callback) {
} }
function publish_package(cb) { function publish_package(cb) {
var fw_uplinks = [] self.local.add_package(name, metadata, callback)
for (var i in self.uplinks) {
if (self.config.proxy_publish(name, i)) {
fw_uplinks.push(self.uplinks[i])
}
}
transaction(
fw_uplinks,
function localAction(cb) {
self.local.add_package(name, metadata, cb)
},
function localRollback(cb) {
self.local.remove_package(name, cb)
},
function remoteAction(remote, cb) {
remote.add_package(name, metadata, cb)
},
function remoteRollback(remote, cb) {
remote.remove_package(name, cb)
},
function(err) {
if (!err) {
callback()
} else if (err.uplink === 'local') {
return callback(err)
} else {
// hide uplink error with general message
return callback(new UError({
status: 503,
msg: 'can\'t upload to one of the uplinks, refuse to publish'
}))
}
}
)
} }
} }
// //
// Add a new version of package {name} to a system // Add a new version of package {name} to a system
// //
// Function uploads a new package version to all uplinks with write access // Used storages: local (write)
// and if everything succeeded it adds it locally.
//
// TODO: if a package is uploaded to uplink1, but upload to uplink2 fails,
// we report failure, but package is not removed from uplink1. This might
// require manual intervention.
//
// Used storages: local (write) && uplinks (proxy_publish, write)
// //
Storage.prototype.add_version = function(name, version, metadata, tag, callback) { Storage.prototype.add_version = function(name, version, metadata, tag, callback) {
var self = this return this.local.add_version(name, version, metadata, tag, callback)
var uplinks = []
for (var i in self.uplinks) {
if (self.config.proxy_publish(name, i)) {
uplinks.push(self.uplinks[i])
}
}
async.map(uplinks, function(up, cb) {
up.add_version(name, version, metadata, tag, cb)
}, function(err, results) {
if (err) {
return callback(new UError({
status: 503,
msg: 'can\'t upload to one of the uplinks, refuse to publish'
}))
}
self.local.add_version(name, version, metadata, tag, callback)
})
} }
//
// Tags a package version with a provided tag // Tags a package version with a provided tag
// //
// Function is basically the same as add_version // Used storages: local (write)
//
// Used storages: local (write) && uplinks (proxy_publish, write)
// //
Storage.prototype.add_tag = function(name, version, tag, callback) { Storage.prototype.add_tag = function(name, version, tag, callback) {
var self = this return this.local.add_tag(name, version, tag, callback)
var uplinks = []
for (var i in self.uplinks) {
if (self.config.proxy_publish(name, i)) {
uplinks.push(self.uplinks[i])
}
}
async.map(uplinks, function(up, cb) {
up.add_tag(name, version, tag, cb)
}, function(err, results) {
if (err) {
return callback(new UError({
status: 503,
msg: 'can\'t tag on one of the uplinks, refuse to proceed'
}))
}
self.local.add_tag(name, version, tag, callback)
})
} }
// //
@ -220,13 +132,7 @@ Storage.prototype.add_tag = function(name, version, tag, callback) {
// Function changes a package info from local storage and all uplinks with // Function changes a package info from local storage and all uplinks with
// write access. // write access.
// //
// TODO: currently it works only locally // Used storages: local (write)
//
// TODO: if a package is uploaded to uplink1, but upload to uplink2 fails,
// we report failure, but package is not removed from uplink1. This might
// require manual intervention.
//
// Used storages: local (write) && uplinks (proxy_publish, write)
// //
Storage.prototype.change_package = function(name, metadata, revision, callback) { Storage.prototype.change_package = function(name, metadata, revision, callback) {
return this.local.change_package(name, metadata, revision, callback) return this.local.change_package(name, metadata, revision, callback)
@ -235,16 +141,9 @@ Storage.prototype.change_package = function(name, metadata, revision, callback)
// //
// Remove a package from a system // Remove a package from a system
// //
// Function removes a package from local storage and all uplinks with // Function removes a package from local storage
// write access.
// //
// TODO: currently it works only locally // Used storages: local (write)
//
// TODO: if a package is uploaded to uplink1, but upload to uplink2 fails,
// we report failure, but package is not removed from uplink1. This might
// require manual intervention.
//
// Used storages: local (write) && uplinks (proxy_publish, write)
// //
Storage.prototype.remove_package = function(name, callback) { Storage.prototype.remove_package = function(name, callback) {
return this.local.remove_package(name, callback) return this.local.remove_package(name, callback)
@ -253,17 +152,11 @@ Storage.prototype.remove_package = function(name, callback) {
// //
// Remove a tarball from a system // Remove a tarball from a system
// //
// Function removes a tarball from local storage and all uplinks with // Function removes a tarball from local storage.
// write access. Tarball in question should not be linked to in any existing // Tarball in question should not be linked to in any existing
// versions, i.e. package version should be unpublished first. // versions, i.e. package version should be unpublished first.
// //
// TODO: currently it works only locally // Used storages: local (write)
//
// TODO: if a package is uploaded to uplink1, but upload to uplink2 fails,
// we report failure, but package is not removed from uplink1. This might
// require manual intervention.
//
// Used storages: local (write) && uplinks (proxy_publish, write)
// //
Storage.prototype.remove_tarball = function(name, filename, revision, callback) { Storage.prototype.remove_tarball = function(name, filename, revision, callback) {
return this.local.remove_tarball(name, filename, revision, callback) return this.local.remove_tarball(name, filename, revision, callback)
@ -274,71 +167,10 @@ Storage.prototype.remove_tarball = function(name, filename, revision, callback)
// //
// Function is syncronous and returns a WritableStream // Function is syncronous and returns a WritableStream
// //
// Function uploads a tarball to all uplinks with write access and to // Used storages: local (write)
// local storage in parallel with a speed of a slowest pipe. It reports
// success if all uploads succeed.
//
// Used storages: local (write) && uplinks (proxy_publish, write)
// //
Storage.prototype.add_tarball = function(name, filename) { Storage.prototype.add_tarball = function(name, filename) {
var stream = new mystreams.UploadTarballStream() return this.local.add_tarball(name, filename)
var self = this
var upstreams = []
var localstream = self.local.add_tarball(name, filename)
upstreams.push(localstream)
for (var i in self.uplinks) {
if (self.config.proxy_publish(name, i)) {
upstreams.push(self.uplinks[i].add_tarball(name, filename))
}
}
function bail(err) {
upstreams.forEach(function(upstream) {
upstream.abort()
})
}
upstreams.forEach(function(upstream) {
stream.pipe(upstream)
upstream.on('error', function(err) {
if (err.code === 'EEXISTS') {
stream.emit('error', new UError({
status: 409,
msg: 'this tarball is already present'
}))
} else if (!stream.status && upstream !== localstream) {
stream.emit('error', new UError({
status: 503,
msg: 'one or more uplinks are unreachable'
}))
} else {
stream.emit('error', err)
}
bail(err)
})
upstream.on('success', function() {
upstream._sinopia_success = true
if (upstreams.filter(function(upstream) {
return !upstream._sinopia_success
}).length === 0) {
stream.emit('success')
}
})
})
stream.abort = function() {
bail()
}
stream.done = function() {
upstreams.forEach(function(upstream) {
upstream.done()
})
}
return stream
} }
// //

View file

@ -1,55 +0,0 @@
var async = require('async')
//
// Function performs a certain task on a multiple uplinks
// and reverts changes if something fails
//
// uplinks - list of uplinks not counting local
// localAction, localRollback - function(cb)
// remoteAction, remoteRollback - function(uplink, cb)
//
module.exports = function(uplinks, localAction, localRollback, remoteAction, remoteRollback, callback) {
var uplink_ids = uplinks.map(function(_, i) {
return i
})
// localAction is first for the two reasons:
// 1. local error is much faster to detect, so we won't bother uplinks in these cases
// 2. we can handle looping cases correctly this way
localAction(function(err) {
if (err) {
err.uplink = 'local'
return callback(err)
}
async.map(uplink_ids, function(i, cb) {
remoteAction(uplinks[i], function(err) {
cb(null, err)
})
}, function(err, res) {
var return_err = err
// let err be first non-null element in the array
for (var i=0; i<res.length; i++) {
if (return_err) break
if (res[i]) {
return_err = res[i]
return_err.uplink = i
}
}
if (!return_err) return callback()
async.map(uplink_ids, function(i, cb) {
if (res[i]) return cb()
remoteRollback(uplinks[i], function() {
cb()
})
}, function(err) {
localRollback(function() {
callback(return_err)
})
})
})
})
}

View file

@ -205,96 +205,6 @@ Storage.prototype.can_fetch_url = function(url) {
&& url.path.indexOf(this.url.path) === 0 && url.path.indexOf(this.url.path) === 0
} }
Storage.prototype.add_package = function(name, metadata, options, callback) {
if (typeof(options) === 'function') callback = options, options = {}
this.request({
uri: '/' + encode(name),
method: 'PUT',
json: metadata,
}, function(err, res, body) {
if (err) return callback(err)
if (!(res.statusCode >= 200 && res.statusCode < 300)) {
return callback(new Error('bad status code: ' + res.statusCode))
}
callback(null, body)
})
}
Storage.prototype.add_version = function(name, version, metadata, tag, options, callback) {
if (typeof(options) === 'function') callback = options, options = {}
this.request({
uri: '/' + encode(name) + '/' + encode(version) + '/-tag/' + encode(tag),
method: 'PUT',
json: metadata,
}, function(err, res, body) {
if (err) return callback(err)
if (!(res.statusCode >= 200 && res.statusCode < 300)) {
return callback(new Error('bad status code: ' + res.statusCode))
}
callback(null, body)
})
}
Storage.prototype.add_tag = function(name, version, tag, options, callback) {
if (typeof(options) === 'function') callback = options, options = {}
this.request({
uri: '/' + encode(name) + '/' + encode(tag),
method: 'PUT',
json: JSON.stringify(version),
}, function(err, res, body) {
if (err) return callback(err)
if (!(res.statusCode >= 200 && res.statusCode < 300)) {
return callback(new Error('bad status code: ' + res.statusCode))
}
callback(null, body)
})
}
Storage.prototype.add_tarball = function(name, filename, options) {
if (!options) options = {}
var stream = new mystreams.UploadTarballStream()
, self = this
var wstream = this.request({
uri: '/' + encode(name) + '/-/' + encode(filename) + '/whatever',
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream'
},
})
wstream.on('response', function(res) {
if (!(res.statusCode >= 200 && res.statusCode < 300)) {
return stream.emit('error', new UError({
msg: 'bad uplink status code: ' + res.statusCode,
status: 500,
}))
}
stream.emit('success')
})
wstream.on('error', function(err) {
stream.emit('error', err)
})
stream.abort = function() {
process.nextTick(function() {
if (wstream.req) {
wstream._sinopia_aborted = true
wstream.req.abort()
}
})
}
stream.done = function() {}
stream.pipe(wstream)
return stream
}
Storage.prototype.get_package = function(name, options, callback) { Storage.prototype.get_package = function(name, options, callback) {
if (typeof(options) === 'function') callback = options, options = {} if (typeof(options) === 'function') callback = options, options = {}

View file

@ -42,16 +42,6 @@ module.exports = function() {
it(prefix+'uploading new package version', function(){}) it(prefix+'uploading new package version', function(){})
it(prefix+'downloading package via server2', function(cb) {
server2.get_package(pkg, function(res, body) {
assert.equal(res.statusCode, 200)
assert.equal(body.name, pkg)
assert.equal(body.versions['0.1.1'].name, pkg)
assert.equal(body.versions['0.1.1'].dist.tarball, 'http://localhost:55552/'+pkg+'/-/blahblah')
cb()
})
})
it(prefix+'uploading incomplete tarball', function(cb) { it(prefix+'uploading incomplete tarball', function(cb) {
server.put_tarball_incomplete(pkg, pkg+'.bad', readfile('fixtures/binary'), 3000, function(res, body) { server.put_tarball_incomplete(pkg, pkg+'.bad', readfile('fixtures/binary'), 3000, function(res, body) {
cb() cb()
@ -76,14 +66,6 @@ module.exports = function() {
cb() cb()
}) })
}) })
it(prefix+'downloading tarball from server2', function(cb) {
server2.get_tarball(pkg, pkg+'.file', function(res, body) {
assert.equal(res.statusCode, 200)
assert.deepEqual(body, readfile('fixtures/binary').toString('utf8'))
cb()
})
})
}) })
}) })
}) })

View file

@ -5,17 +5,21 @@ var assert = require('assert')
module.exports = function() { module.exports = function() {
describe('Security', function() { describe('Security', function() {
it('bad pkg #1', function(cb) {
server.get_package('package.json', function(res, body) { server.get_package('package.json', function(res, body) {
assert.equal(res.statusCode, 403) assert.equal(res.statusCode, 403)
assert(~body.error.indexOf('invalid package')) assert(~body.error.indexOf('invalid package'))
cb() cb()
}) })
})
it('bad pkg #2', function(cb) {
server.get_package('__proto__', function(res, body) { server.get_package('__proto__', function(res, body) {
assert.equal(res.statusCode, 403) assert.equal(res.statusCode, 403)
assert(~body.error.indexOf('invalid package')) assert(~body.error.indexOf('invalid package'))
cb() cb()
}) })
})
it('__proto__, connect stuff', function(cb) { it('__proto__, connect stuff', function(cb) {
server.request({uri:'/testpkg?__proto__=1'}, function(err, res, body) { server.request({uri:'/testpkg?__proto__=1'}, function(err, res, body) {

View file

@ -1,88 +0,0 @@
var transaction = require('../../lib/transaction')
var assert = require('assert')
function call_back(cb, value) {
setTimeout(function() {
cb(value)
}, Math.random()*30)
}
function test(uplinks, cb) {
var calls = []
var local = uplinks.shift()
transaction(
uplinks.map(
function(x, i) {return [i, x]}
),
function localAction(cb) {
calls.push('l')
call_back(cb, !local ? 'l' : null)
},
function localRollback(cb) {
calls.push('lb')
call_back(cb, true)
},
function remoteAction(remote, cb) {
calls.push('r'+remote[0])
call_back(cb, !remote[1] ? 'r'+remote[0] : null)
},
function remoteRollback(remote, cb) {
calls.push('rb'+remote[0])
call_back(cb, true)
},
function callback(err) {
cb(err, calls)
}
)
}
describe('Transaction', function() {
it('everything is fine', function(cb) {
test([true, true, true, true, true], function(err, calls) {
assert.deepEqual(err, undefined)
assert.deepEqual(calls, [ 'l', 'r0', 'r1', 'r2', 'r3' ])
cb()
})
})
it("local throws errors - don't call remotes", function(cb) {
test([false, true, true, true, true], function(err, calls) {
assert.deepEqual(err, 'l')
assert.deepEqual(calls, ['l'])
cb()
})
})
it('remote fails, call all rollbacks', function(cb) {
test([true, true, true, false, true], function(err, calls) {
assert.deepEqual(err, 'r2')
assert.deepEqual(calls, [ 'l', 'r0', 'r1', 'r2', 'r3', 'rb0', 'rb1', 'rb3', 'lb' ])
cb()
})
})
it('no remotes', function(cb) {
test([true], function(err, calls) {
assert.deepEqual(err, undefined)
assert.deepEqual(calls, [ 'l' ])
cb()
})
})
it('all remotes fail', function(cb) {
test([true, false, false, false, false], function(err, calls) {
assert.deepEqual(err, 'r0')
assert.deepEqual(calls, [ 'l', 'r0', 'r1', 'r2', 'r3', 'lb' ])
cb()
})
})
it('mix', function(cb) {
test([true, true, false, true, false], function(err, calls) {
assert.deepEqual(err, 'r1')
assert.deepEqual(calls, [ 'l', 'r0', 'r1', 'r2', 'r3', 'rb0', 'rb2', 'lb' ])
cb()
})
})
})