0
Fork 0
mirror of https://github.com/verdaccio/verdaccio.git synced 2024-12-30 22:34:10 -05:00

fixing race conditions when updating package data

This commit is contained in:
Alex Kocharin 2013-10-22 09:10:25 +04:00
parent e35c02f8f1
commit 8b314040d9
11 changed files with 293 additions and 34 deletions

View file

@ -1,4 +1,9 @@
?? ??? ????, version 0.5.2
- added fs-ext dependency (flock)
- fixed a few face conditions
20 Oct 2013, version 0.5.1 20 Oct 2013, version 0.5.1
- fixed a few errors related to logging - fixed a few errors related to logging

View file

@ -1,7 +1,8 @@
var fs = require('fs'); var fs = require('fs')
var Path = require('path'); , fsExt = require('fs-ext')
var mystreams = require('./streams'); , Path = require('path')
var FSError = require('./error').FSError; , mystreams = require('./streams')
, FSError = require('./error').FSError
function make_directories(dest, cb) { function make_directories(dest, cb) {
var dir = Path.dirname(dest); var dir = Path.dirname(dest);
@ -109,7 +110,51 @@ function update(name, contents, callback) {
} }
function read(name, callback) { function read(name, callback) {
fs.readFile(name, callback); fs.readFile(name, callback)
}
// open and flock with exponential backoff
function open_flock(name, opmod, flmod, tries, backoff, cb) {
fs.open(name, opmod, function(err, fd) {
if (err) return cb(err, fd)
fsExt.flock(fd, flmod, function(err) {
if (err) {
if (!tries) {
fs.close(fd, function() {
cb(err)
})
} else {
fs.close(fd, function() {
setTimeout(function() {
open_flock(name, opmod, flmod, tries-1, backoff*2, cb)
}, backoff)
})
}
} else {
cb(null, fd)
}
})
})
}
// this function neither unlocks file nor closes it
// it'll have to be done manually later
function lock_and_read(name, callback) {
open_flock(name, 'r', 'exnb', 4, 10, function(err, fd) {
if (err) return callback(err, fd)
fs.fstat(fd, function(err, st) {
if (err) return callback(err, fd)
var buffer = new Buffer(st.size)
fs.read(fd, buffer, 0, st.size, null, function(err, bytesRead, buffer) {
if (bytesRead != st.size) return callback(new Error('st.size != bytesRead'), fd)
callback(null, fd, buffer)
})
})
})
} }
function Storage(path) { function Storage(path) {
@ -123,7 +168,7 @@ function Storage(path) {
} }
Storage.prototype.read = function(name, cb) { Storage.prototype.read = function(name, cb) {
read(this.path + '/' + name, cb); read(this.path + '/' + name, cb)
} }
Storage.prototype.read_json = function(name, cb) { Storage.prototype.read_json = function(name, cb) {
@ -140,6 +185,24 @@ Storage.prototype.read_json = function(name, cb) {
}) })
} }
Storage.prototype.lock_and_read = function(name, cb) {
lock_and_read(this.path + '/' + name, cb)
}
Storage.prototype.lock_and_read_json = function(name, cb) {
lock_and_read(this.path + '/' + name, function(err, fd, res) {
if (err) return cb(err, fd)
var args = []
try {
args = [null, fd, JSON.parse(res.toString('utf8'))]
} catch(err) {
args = [err, fd]
}
cb.apply(null, args)
})
}
Storage.prototype.path_to = function(file) { Storage.prototype.path_to = function(file) {
return this.path + '/' + file return this.path + '/' + file
} }

View file

@ -140,23 +140,21 @@ Storage.prototype.update_versions = function(name, newdata, callback) {
} }
Storage.prototype.add_version = function(name, version, metadata, tag, callback) { Storage.prototype.add_version = function(name, version, metadata, tag, callback) {
var self = this; var self = this
self._read_create_package(name, function(err, data) { self.update_package(name, function updater(data, cb) {
// why does anyone need to keep that in database? // why does anyone need to keep that in database?
delete metadata.readme; delete metadata.readme
if (err) return callback(err);
if (data.versions[version] != null) { if (data.versions[version] != null) {
return callback(new UError({ return cb(new UError({
status: 409, status: 409,
msg: 'this version already present' msg: 'this version already present'
})); }))
} }
data.versions[version] = metadata; data.versions[version] = metadata
data['dist-tags'][tag] = version; data['dist-tags'][tag] = version
self.storage.update_json(name + '/' + info_file, data, callback); cb()
}); }, callback)
} }
Storage.prototype.add_tarball = function(name, filename) { Storage.prototype.add_tarball = function(name, filename) {
@ -269,5 +267,64 @@ Storage.prototype.get_package = function(name, callback) {
}) })
} }
//
// This function allows to update the package thread-safely
//
// Arguments:
// - name - package name
// - updateFn - function(package, cb) - update function
// - callback - callback that gets invoked after it's all updated
//
// Algorithm:
// 1. lock package.json for writing
// 2. read package.json
// 3. updateFn(pkg, cb), and wait for cb
// 4. write package.json.tmp
// 5. move package.json.tmp package.json
// 6. callback(err?)
//
Storage.prototype.update_package = function(name, updateFn, _callback) {
var self = this
, file = name + '/' + info_file
self.storage.lock_and_read_json(file, function(err, fd, json) {
self.logger.debug({file: file}, 'locking @{file}')
function callback() {
self.logger.debug({file: file}, 'unlocking @{file}')
var _args = arguments
if (fd) {
fs.close(fd, function(err) {
if (err) return _callback(err)
_callback.apply(null, _args)
})
} else {
_callback.apply(null, _args)
}
}
if (err) {
if (err.code === 'EAGAIN') {
return callback(new UError({
status: 503,
msg: 'resource temporarily unavailable'
}))
} else if (err.code === 'ENOENT') {
return callback(new UError({
status: 404,
msg: 'no such package available',
}))
} else {
return callback(err)
}
}
updateFn(json, function(err) {
if (err) return callback(err)
self.storage.write_json(name + '/' + info_file, json, callback)
})
})
}
module.exports = Storage; module.exports = Storage;

View file

@ -27,6 +27,7 @@ dependencies:
semver: '*' semver: '*'
minimatch: '*' minimatch: '*'
bunyan: '>= 0.16.4' bunyan: '>= 0.16.4'
fs-ext: '*'
devDependencies: devDependencies:
rimraf: '*' rimraf: '*'

View file

@ -59,6 +59,14 @@ ex['downloading newly created tarball'] = function(cb) {
}); });
}; };
ex['uploading new package version for bad pkg'] = function(cb) {
server.put_version('testpxg', '0.0.1', require('./lib/package')('testpxg'), function(res, body) {
assert.equal(res.statusCode, 404);
assert(~body.error.indexOf('no such package'));
cb();
});
};
ex['uploading new package version'] = function(cb) { ex['uploading new package version'] = function(cb) {
server.put_version('testpkg', '0.0.1', require('./lib/package')('testpkg'), function(res, body) { server.put_version('testpkg', '0.0.1', require('./lib/package')('testpkg'), function(res, body) {
assert.equal(res.statusCode, 201); assert.equal(res.statusCode, 201);

View file

@ -8,6 +8,9 @@ uplinks:
server2: server2:
url: http://localhost:55552/ url: http://localhost:55552/
logs:
- {type: stdout, format: pretty, level: trace}
packages: packages:
'testfwd*': 'testfwd*':
allow_access: all allow_access: all

13
test/config.js Normal file
View file

@ -0,0 +1,13 @@
var assert = require('assert');
var ex = module.exports;
ex['trying to fetch non-existent package'] = function(cb) {
var f = fork('../bin/sinopia', ['-c', './config/log-1.yaml'], {silent: true});
f.on('message', function(msg) {
if ('sinopia_started' in msg) {
f.kill();
cb();
}
});
};

View file

@ -5,6 +5,7 @@ function Server(url) {
if (!(this instanceof Server)) return new Server(url); if (!(this instanceof Server)) return new Server(url);
this.url = url.replace(/\/$/, ''); this.url = url.replace(/\/$/, '');
this.userAgent = 'node/v0.10.8 linux x64'; this.userAgent = 'node/v0.10.8 linux x64';
this.authstr = 'Basic '+(new Buffer('test:test')).toString('base64');
} }
function prep(cb) { function prep(cb) {
@ -18,7 +19,7 @@ Server.prototype.request = function(options, cb) {
var headers = options.headers || {}; var headers = options.headers || {};
headers.accept = headers.accept || 'application/json'; headers.accept = headers.accept || 'application/json';
headers['user-agent'] = headers['user-agent'] || this.userAgent; headers['user-agent'] = headers['user-agent'] || this.userAgent;
headers.authorization = headers.authorization || this.auth; headers.authorization = headers.authorization || this.authstr;
return request({ return request({
url: this.url + options.uri, url: this.url + options.uri,
method: options.method || 'GET', method: options.method || 'GET',
@ -28,7 +29,7 @@ Server.prototype.request = function(options, cb) {
} }
Server.prototype.auth = function(user, pass, cb) { Server.prototype.auth = function(user, pass, cb) {
this.auth = 'Basic '+(new Buffer(user+':'+pass)).toString('base64'); this.authstr = 'Basic '+(new Buffer(user+':'+pass)).toString('base64');
this.request({ this.request({
uri: '/-/user/org.couchdb.user:'+escape(user)+'/-rev/undefined', uri: '/-/user/org.couchdb.user:'+escape(user)+'/-rev/undefined',
method: 'PUT', method: 'PUT',

86
test/race.js Normal file
View file

@ -0,0 +1,86 @@
var assert = require('assert')
, readfile = require('fs').readFileSync
, ex = module.exports
, server = process.server
, server2 = process.server2
, async = require('async')
, _oksum = 0
ex['creating new package'] = function(cb) {
server.put_package('race', require('./lib/package')('race'), function(res, body) {
assert.equal(res.statusCode, 201)
assert(~body.ok.indexOf('created new package'))
cb()
})
}
ex['uploading 10 same versions'] = function(cb) {
var fns = []
for (var i=0; i<10; i++) {
fns.push(function(cb_) {
var data = require('./lib/package')('race')
data.rand = Math.random()
server.put_version('race', '0.0.1', data, function(res, body) {
cb_(null, res, body)
})
})
}
async.parallel(fns, function(err, res) {
var okcount = 0
, failcount = 0
res.forEach(function(arr) {
var resp = arr[0]
, body = arr[1]
if (resp.statusCode === 201 && ~body.ok.indexOf('published')) okcount++
if (resp.statusCode === 409 && ~body.error.indexOf('already present')) failcount++
if (resp.statusCode === 503 && ~body.error.indexOf('unavailable')) failcount++
})
assert.equal(okcount + failcount, 10)
assert.equal(okcount, 1)
_oksum += okcount
cb()
})
}
ex['uploading 10 diff versions'] = function(cb) {
var fns = []
for (var i=0; i<10; i++) {
;(function(i) {
fns.push(function(cb_) {
server.put_version('race', '0.1.'+String(i), require('./lib/package')('race'), function(res, body) {
cb_(null, res, body)
})
})
})(i)
}
async.parallel(fns, function(err, res) {
var okcount = 0
, failcount = 0
res.forEach(function(arr) {
var resp = arr[0]
, body = arr[1]
if (resp.statusCode === 201 && ~body.ok.indexOf('published')) okcount++
if (resp.statusCode === 409 && ~body.error.indexOf('already present')) failcount++
if (resp.statusCode === 503 && ~body.error.indexOf('unavailable')) failcount++
})
assert.equal(okcount + failcount, 10)
_oksum += okcount
cb()
})
}
ex['downloading package'] = function(cb) {
server.get_package('race', function(res, body) {
assert.equal(res.statusCode, 200)
assert.equal(Object.keys(body.versions).length, _oksum)
cb()
})
}

21
test/repl.js Executable file
View file

@ -0,0 +1,21 @@
#!/usr/bin/env node
var Server = require('./lib/server')
, forks = process.forks = []
, server = process.server = new Server('http://localhost:55551/')
, server2 = process.server2 = new Server('http://localhost:55552/')
process.on('exit', function() {
if (forks[0]) forks[0].kill()
if (forks[1]) forks[1].kill()
})
var repl = require('repl').start({
prompt: "> ",
input: process.stdin,
output: process.stdout,
})
repl.context.server = server
repl.context.server2 = server2

View file

@ -1,20 +1,21 @@
var fs = require('fs'); var fs = require('fs')
var async = require('async'); , async = require('async')
var assert = require('assert'); , assert = require('assert')
var Server = require('./lib/server'); , Server = require('./lib/server')
var readfile = require('fs').readFileSync; , readfile = require('fs').readFileSync
var ex = module.exports; , ex = module.exports
var forks = process.forks = []; var forks = process.forks = []
process.server = new Server('http://localhost:55551/'); process.server = new Server('http://localhost:55551/')
process.server2 = new Server('http://localhost:55552/'); process.server2 = new Server('http://localhost:55552/')
ex['Startup:'] = require('./startup'); ex['Startup:'] = require('./startup')
ex['Basic:'] = require('./basic'); ex['Basic:'] = require('./basic')
ex['Mirror:'] = require('./mirror'); ex['Mirror:'] = require('./mirror')
ex['Race:'] = require('./race')
process.on('exit', function() { process.on('exit', function() {
if (forks[0]) forks[0].kill(); if (forks[0]) forks[0].kill()
if (forks[1]) forks[1].kill(); if (forks[1]) forks[1].kill()
}); })