Browse Source

Merge pull request #175 from isocolsky/locks

Locks
activeAddress
Matias Alejo Garcia 10 years ago
parent
commit
ab91edd035
  1. 8
      config.js
  2. 63
      lib/locallock.js
  3. 48
      lib/lock.js
  4. 27
      lib/server.js
  5. 38
      lib/storage.js
  6. 16
      lib/utils.js
  7. 6
      locker-server.js
  8. 2
      package.json
  9. 111
      test/locallock.js
  10. 50
      test/lock.js

8
config.js

@ -21,6 +21,14 @@ var config = {
}, },
*/ */
}, },
lockOpts: {
/* To use locker-server, uncomment this:
lockerServer: {
host: 'localhost',
port: 3003,
},
*/
},
}, },
}; };
module.exports = config; module.exports = config;

63
lib/locallock.js

@ -0,0 +1,63 @@
var _ = require('lodash');
var $ = require('preconditions').singleton();
function Lock() {
this.tasks = {};
};
Lock.prototype._release = function(token, task) {
if (!task.running) return;
task.running = false;
this.tasks[token] = _.without(this.tasks[token], task);
this._runOne(token);
};
Lock.prototype._runOne = function(token) {
var self = this;
if (_.any(self.tasks[token], {
running: true
})) return;
var task = _.first(self.tasks[token]);
if (!task) return;
task.running = true;
if (task.timeout > 0) {
setTimeout(function() {
self._release(token, task);
}, task.timeout);
}
task.fn(null, function() {
self._release(token, task);
});
};
Lock.prototype.locked = function(token, wait, max, userTask) {
var self = this;
if (_.isUndefined(self.tasks[token])) {
self.tasks[token] = [];
}
var task = {
timeout: max,
running: false,
fn: userTask,
};
self.tasks[token].push(task);
if (wait > 0) {
setTimeout(function() {
if (task.running || !_.contains(self.tasks[token], task)) return;
self.tasks[token] = _.without(self.tasks[token], task);
task.fn(new Error('Could not acquire lock ' + token));
}, wait);
}
self._runOne(token);
};
module.exports = Lock;

48
lib/lock.js

@ -1,34 +1,38 @@
var _ = require('lodash');
var $ = require('preconditions').singleton(); var $ = require('preconditions').singleton();
var _ = require('lodash');
var log = require('npmlog');
log.debug = log.verbose;
var locks = {}; var LocalLock = require('./locallock');
var RemoteLock = require('locker');
var Lock = function() { function Lock(opts) {
this.taken = false; opts = opts || {};
this.queue = []; if (opts.lockerServer) {
}; this.lock = new RemoteLock(opts.lockerServer.port, opts.lockerServer.host);
Lock.prototype.free = function() { this.lock.on('reset', function() {
if (this.queue.length > 0) { log.debug('Locker server reset');
var f = this.queue.shift(); });
f(this); this.lock.on('error', function(error) {
log.error('Locker server threw error', error);
});
} else { } else {
this.taken = false; this.lock = new LocalLock();
} }
}; };
Lock.get = function(key, callback) { Lock.prototype.runLocked = function(token, cb, task) {
if (_.isUndefined(locks[key])) { $.shouldBeDefined(token);
locks[key] = new Lock();
}
var lock = locks[key];
if (lock.taken) { this.lock.locked(token, 5 * 1000, 24 * 60 * 60 * 1000, function(err, release) {
lock.queue.push(callback); if (err) return cb(new Error('Wallet is locked'));
} else { var _cb = function() {
lock.taken = true; cb.apply(null, arguments);
callback(lock); release();
} };
task(_cb);
});
}; };
module.exports = Lock; module.exports = Lock;

27
lib/server.js

@ -13,6 +13,7 @@ var Address = Bitcore.Address;
var ClientError = require('./clienterror'); var ClientError = require('./clienterror');
var Utils = require('./utils'); var Utils = require('./utils');
var Lock = require('./lock');
var Storage = require('./storage'); var Storage = require('./storage');
var NotificationBroadcaster = require('./notificationbroadcaster'); var NotificationBroadcaster = require('./notificationbroadcaster');
var BlockchainExplorer = require('./blockchainexplorer'); var BlockchainExplorer = require('./blockchainexplorer');
@ -24,7 +25,7 @@ var TxProposal = require('./model/txproposal');
var Notification = require('./model/notification'); var Notification = require('./model/notification');
var initialized = false; var initialized = false;
var storage, blockchainExplorer; var lock, storage, blockchainExplorer;
/** /**
@ -35,6 +36,7 @@ function WalletService() {
if (!initialized) if (!initialized)
throw new Error('Server not initialized'); throw new Error('Server not initialized');
this.lock = lock;
this.storage = storage; this.storage = storage;
this.blockchainExplorer = blockchainExplorer; this.blockchainExplorer = blockchainExplorer;
this.notifyTicker = 0; this.notifyTicker = 0;
@ -52,7 +54,8 @@ WalletService.onNotification = function(func) {
*/ */
WalletService.initialize = function(opts) { WalletService.initialize = function(opts) {
opts = opts || {}; opts = opts || {};
storage = opts.storage ||  new Storage(opts.storageOpts); lock = opts.lock || new Lock(opts.lockOpts);
storage = opts.storage || new Storage(opts.storageOpts);
blockchainExplorer = opts.blockchainExplorer; blockchainExplorer = opts.blockchainExplorer;
initialized = true; initialized = true;
}; };
@ -88,6 +91,11 @@ WalletService.getInstanceWithAuth = function(opts, cb) {
}); });
}; };
WalletService.prototype._runLocked = function(cb, task) {
$.checkState(this.walletId);
this.lock.runLocked(this.walletId, cb, task);
};
/** /**
* Creates a new wallet. * Creates a new wallet.
@ -190,7 +198,7 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) {
if (opts.isTemporaryRequestKey) if (opts.isTemporaryRequestKey)
return cb(new ClientError('Bad arguments')); return cb(new ClientError('Bad arguments'));
Utils.runLocked(self.walletId, cb, function(cb) { self._runLocked(cb, function(cb) {
self.storage.fetchWallet(self.walletId, function(err, wallet) { self.storage.fetchWallet(self.walletId, function(err, wallet) {
if (err) return cb(err); if (err) return cb(err);
@ -298,7 +306,8 @@ WalletService.prototype.joinWallet = function(opts, cb) {
if (_.isEmpty(opts.name)) if (_.isEmpty(opts.name))
return cb(new ClientError('Invalid copayer name')); return cb(new ClientError('Invalid copayer name'));
Utils.runLocked(opts.walletId, cb, function(cb) { self.walletId = opts.walletId;
self._runLocked(cb, function(cb) {
self.storage.fetchWallet(opts.walletId, function(err, wallet) { self.storage.fetchWallet(opts.walletId, function(err, wallet) {
if (err) return cb(err); if (err) return cb(err);
@ -357,7 +366,7 @@ WalletService.prototype.joinWallet = function(opts, cb) {
WalletService.prototype.createAddress = function(opts, cb) { WalletService.prototype.createAddress = function(opts, cb) {
var self = this; var self = this;
Utils.runLocked(self.walletId, cb, function(cb) { self._runLocked(cb, function(cb) {
self.getWallet({}, function(err, wallet) { self.getWallet({}, function(err, wallet) {
if (err) return cb(err); if (err) return cb(err);
if (!wallet.isComplete()) if (!wallet.isComplete())
@ -612,7 +621,7 @@ WalletService.prototype.createTx = function(opts, cb) {
if (!Utils.checkRequired(opts, ['toAddress', 'amount', 'proposalSignature'])) if (!Utils.checkRequired(opts, ['toAddress', 'amount', 'proposalSignature']))
return cb(new ClientError('Required argument missing')); return cb(new ClientError('Required argument missing'));
Utils.runLocked(self.walletId, cb, function(cb) { self._runLocked(cb, function(cb) {
self.getWallet({}, function(err, wallet) { self.getWallet({}, function(err, wallet) {
if (err) return cb(err); if (err) return cb(err);
if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete'));
@ -702,7 +711,7 @@ WalletService.prototype.getTx = function(opts, cb) {
WalletService.prototype.removeWallet = function(opts, cb) { WalletService.prototype.removeWallet = function(opts, cb) {
var self = this; var self = this;
Utils.runLocked(self.walletId, cb, function(cb) { self._runLocked(cb, function(cb) {
self.storage.removeWallet(self.walletId, cb); self.storage.removeWallet(self.walletId, cb);
}); });
}; };
@ -720,7 +729,7 @@ WalletService.prototype.removePendingTx = function(opts, cb) {
if (!Utils.checkRequired(opts, ['txProposalId'])) if (!Utils.checkRequired(opts, ['txProposalId']))
return cb(new ClientError('Required argument missing')); return cb(new ClientError('Required argument missing'));
Utils.runLocked(self.walletId, cb, function(cb) { self._runLocked(cb, function(cb) {
self.getTx({ self.getTx({
txProposalId: opts.txProposalId, txProposalId: opts.txProposalId,
@ -1178,7 +1187,7 @@ WalletService.prototype.scan = function(opts, cb) {
}; };
Utils.runLocked(self.walletId, cb, function(cb) { self._runLocked(cb, function(cb) {
self.getWallet({}, function(err, wallet) { self.getWallet({}, function(err, wallet) {
if (err) return cb(err); if (err) return cb(err);
if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete'));

38
lib/storage.js

@ -147,9 +147,9 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var txs = []; var txs = [];
var key = KEY.PENDING_TXP(walletId); var key = KEY.PENDING_TXP(walletId);
this.db.createReadStream({ this.db.createReadStream({
gte: key, gte: key,
lt: key + '~' lt: key + '~'
}) })
.on('data', function(data) { .on('data', function(data) {
txs.push(TxProposal.fromObj(data.value)); txs.push(TxProposal.fromObj(data.value));
}) })
@ -183,11 +183,11 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var endkey = KEY.TXP(walletId, opts.maxTs); var endkey = KEY.TXP(walletId, opts.maxTs);
this.db.createReadStream({ this.db.createReadStream({
gt: key, gt: key,
lt: endkey + '~', lt: endkey + '~',
reverse: true, reverse: true,
limit: opts.limit, limit: opts.limit,
}) })
.on('data', function(data) { .on('data', function(data) {
txs.push(TxProposal.fromObj(data.value)); txs.push(TxProposal.fromObj(data.value));
}) })
@ -220,11 +220,11 @@ Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); var endkey = KEY.NOTIFICATION(walletId, opts.maxTs);
this.db.createReadStream({ this.db.createReadStream({
gt: key, gt: key,
lt: endkey + '~', lt: endkey + '~',
reverse: opts.reverse, reverse: opts.reverse,
limit: opts.limit, limit: opts.limit,
}) })
.on('data', function(data) { .on('data', function(data) {
txs.push(Notification.fromObj(data.value)); txs.push(Notification.fromObj(data.value));
}) })
@ -284,9 +284,9 @@ Storage.prototype._delByKey = function(key, cb) {
var self = this; var self = this;
var keys = []; var keys = [];
this.db.createKeyStream({ this.db.createKeyStream({
gte: key, gte: key,
lt: key + '~', lt: key + '~',
}) })
.on('data', function(key) { .on('data', function(key) {
keys.push(key); keys.push(key);
}) })
@ -341,9 +341,9 @@ Storage.prototype.fetchAddresses = function(walletId, cb) {
var addresses = []; var addresses = [];
var key = KEY.ADDRESS(walletId); var key = KEY.ADDRESS(walletId);
this.db.createReadStream({ this.db.createReadStream({
gte: key, gte: key,
lt: key + '~' lt: key + '~'
}) })
.on('data', function(data) { .on('data', function(data) {
addresses.push(Address.fromObj(data.value)); addresses.push(Address.fromObj(data.value));
}) })

16
lib/utils.js

@ -1,24 +1,8 @@
var $ = require('preconditions').singleton(); var $ = require('preconditions').singleton();
var _ = require('lodash'); var _ = require('lodash');
var Lock = require('./lock');
var Utils = {}; var Utils = {};
Utils.runLocked = function(token, cb, task) {
var self = this;
$.shouldBeDefined(token);
Lock.get(token, function(lock) {
var _cb = function() {
cb.apply(null, arguments);
lock.free();
};
task(_cb);
});
};
Utils.checkRequired = function(obj, args) { Utils.checkRequired = function(obj, args) {
args = [].concat(args); args = [].concat(args);
if (!_.isObject(obj)) return false; if (!_.isObject(obj)) return false;

6
locker-server.js

@ -0,0 +1,6 @@
(function() {
var Locker = require('locker-server'),
locker = new Locker();
locker.listen(3003);
})();

2
package.json

@ -28,6 +28,8 @@
"inherits": "^2.0.1", "inherits": "^2.0.1",
"leveldown": "^0.10.0", "leveldown": "^0.10.0",
"levelup": "^0.19.0", "levelup": "^0.19.0",
"locker": "^0.1.0",
"locker-server": "^0.1.3",
"lodash": "^3.3.1", "lodash": "^3.3.1",
"mocha-lcov-reporter": "0.0.1", "mocha-lcov-reporter": "0.0.1",
"morgan": "*", "morgan": "*",

111
test/locallock.js

@ -0,0 +1,111 @@
'use strict';
var _ = require('lodash');
var chai = require('chai');
var sinon = require('sinon');
var should = chai.should();
var Lock = require('../lib/locallock');
describe('Local locks', function() {
var lock;
beforeEach(function() {
this.clock = sinon.useFakeTimers();
lock = new Lock();
});
afterEach(function() {
this.clock.restore();
});
it('should lock tasks using the same token', function() {
var a = false,
b = false;
lock.locked('123', 0, 0, function(err, release) {
should.not.exist(err);
a = true;
setTimeout(function() {
release();
}, 5);
lock.locked('123', 0, 0, function(err, release) {
should.not.exist(err);
b = true;
release();
});
});
a.should.equal(true);
b.should.equal(false);
this.clock.tick(10);
a.should.equal(true);
b.should.equal(true);
});
it('should not lock tasks using different tokens', function() {
var i = 0;
lock.locked('123', 0, 0, function(err, release) {
should.not.exist(err);
i++;
setTimeout(function() {
release();
}, 5);
lock.locked('456', 0, 0, function(err, release) {
should.not.exist(err);
i++;
release();
});
});
i.should.equal(2);
});
it('should return error if unable to acquire lock', function() {
lock.locked('123', 0, 0, function(err, release) {
should.not.exist(err);
setTimeout(function() {
release();
}, 5);
lock.locked('123', 1, 0, function(err, release) {
should.exist(err);
err.toString().should.contain('Could not acquire lock 123');
});
});
this.clock.tick(2);
});
it('should release lock if acquired for a long time', function() {
var i = 0;
lock.locked('123', 0, 3, function(err, release) {
should.not.exist(err);
i++;
lock.locked('123', 20, 0, function(err, release) {
should.not.exist(err);
i++;
release();
});
});
i.should.equal(1);
this.clock.tick(1);
i.should.equal(1);
this.clock.tick(10);
i.should.equal(2);
});
it('should only release one pending task on lock timeout', function() {
var i = 0;
lock.locked('123', 0, 3, function(err, release) {
should.not.exist(err);
i++;
lock.locked('123', 5, 0, function(err, release) {
should.not.exist(err);
i++;
setTimeout(function() {
release();
}, 5);
});
lock.locked('123', 20, 0, function(err, release) {
should.not.exist(err);
i++;
release();
});
});
i.should.equal(1);
this.clock.tick(4);
i.should.equal(2)
this.clock.tick(7);
i.should.equal(3)
});
});

50
test/lock.js

@ -1,50 +0,0 @@
'use strict';
var _ = require('lodash');
var chai = require('chai');
var sinon = require('sinon');
var should = chai.should();
var Lock = require('../lib/lock');
describe('Lock', function() {
it('should lock tasks using the same token', function(done) {
var a = false,
b = false;
Lock.get('123', function(lock) {
a = true;
setTimeout(function() {
lock.free();
}, 5);
Lock.get('123', function(lock) {
b = true;
lock.free();
});
});
setTimeout(function() {
a.should.equal(true);
b.should.equal(false);
}, 1);
setTimeout(function() {
a.should.equal(true);
b.should.equal(true);
done();
}, 8);
});
it('should not lock tasks using different tokens', function(done) {
var i = 0;
Lock.get('123', function(lock) {
i++;
setTimeout(function() {
lock.free();
}, 5);
Lock.get('456', function(lock) {
i++;
lock.free();
});
});
setTimeout(function() {
i.should.equal(2);
done();
}, 1);
});
});
Loading…
Cancel
Save