diff --git a/config.js b/config.js index c13db93..c25aabc 100644 --- a/config.js +++ b/config.js @@ -21,6 +21,14 @@ var config = { }, */ }, + lockOpts: { + /* To use locker-server, uncomment this: + lockerServer: { + host: 'localhost', + port: 3003, + }, + */ + }, }, }; module.exports = config; diff --git a/lib/locallock.js b/lib/locallock.js new file mode 100644 index 0000000..cddc50c --- /dev/null +++ b/lib/locallock.js @@ -0,0 +1,63 @@ +var _ = require('lodash'); +var $ = require('preconditions').singleton(); + +function Lock() { + this.tasks = {}; +}; + +Lock.prototype._release = function(token, task) { + if (!task.running) return; + task.running = false; + this.tasks[token] = _.without(this.tasks[token], task); + this._runOne(token); +}; + +Lock.prototype._runOne = function(token) { + var self = this; + + if (_.any(self.tasks[token], { + running: true + })) return; + + var task = _.first(self.tasks[token]); + if (!task) return; + + task.running = true; + + if (task.timeout > 0) { + setTimeout(function() { + self._release(token, task); + }, task.timeout); + } + + task.fn(null, function() { + self._release(token, task); + }); +}; + +Lock.prototype.locked = function(token, wait, max, userTask) { + var self = this; + + if (_.isUndefined(self.tasks[token])) { + self.tasks[token] = []; + } + + var task = { + timeout: max, + running: false, + fn: userTask, + }; + self.tasks[token].push(task); + + if (wait > 0) { + setTimeout(function() { + if (task.running || !_.contains(self.tasks[token], task)) return; + self.tasks[token] = _.without(self.tasks[token], task); + task.fn(new Error('Could not acquire lock ' + token)); + }, wait); + } + + self._runOne(token); +}; + +module.exports = Lock; diff --git a/lib/lock.js b/lib/lock.js index 6ca71a4..dda8e9d 100644 --- a/lib/lock.js +++ b/lib/lock.js @@ -1,34 +1,38 @@ -var _ = require('lodash'); var $ = require('preconditions').singleton(); +var _ = require('lodash'); +var log = require('npmlog'); +log.debug = log.verbose; -var locks = {}; +var LocalLock = require('./locallock'); +var RemoteLock = require('locker'); -var Lock = function() { - this.taken = false; - this.queue = []; -}; +function Lock(opts) { + opts = opts || {}; + if (opts.lockerServer) { + this.lock = new RemoteLock(opts.lockerServer.port, opts.lockerServer.host); -Lock.prototype.free = function() { - if (this.queue.length > 0) { - var f = this.queue.shift(); - f(this); + this.lock.on('reset', function() { + log.debug('Locker server reset'); + }); + this.lock.on('error', function(error) { + log.error('Locker server threw error', error); + }); } else { - this.taken = false; + this.lock = new LocalLock(); } }; -Lock.get = function(key, callback) { - if (_.isUndefined(locks[key])) { - locks[key] = new Lock(); - } - var lock = locks[key]; +Lock.prototype.runLocked = function(token, cb, task) { + $.shouldBeDefined(token); - if (lock.taken) { - lock.queue.push(callback); - } else { - lock.taken = true; - callback(lock); - } + this.lock.locked(token, 5 * 1000, 24 * 60 * 60 * 1000, function(err, release) { + if (err) return cb(new Error('Wallet is locked')); + var _cb = function() { + cb.apply(null, arguments); + release(); + }; + task(_cb); + }); }; module.exports = Lock; diff --git a/lib/server.js b/lib/server.js index 9f804b8..948b7c0 100644 --- a/lib/server.js +++ b/lib/server.js @@ -13,6 +13,7 @@ var Address = Bitcore.Address; var ClientError = require('./clienterror'); var Utils = require('./utils'); +var Lock = require('./lock'); var Storage = require('./storage'); var NotificationBroadcaster = require('./notificationbroadcaster'); var BlockchainExplorer = require('./blockchainexplorer'); @@ -24,7 +25,7 @@ var TxProposal = require('./model/txproposal'); var Notification = require('./model/notification'); var initialized = false; -var storage, blockchainExplorer; +var lock, storage, blockchainExplorer; /** @@ -35,6 +36,7 @@ function WalletService() { if (!initialized) throw new Error('Server not initialized'); + this.lock = lock; this.storage = storage; this.blockchainExplorer = blockchainExplorer; this.notifyTicker = 0; @@ -52,7 +54,8 @@ WalletService.onNotification = function(func) { */ WalletService.initialize = function(opts) { opts = opts || {}; - storage = opts.storage ||  new Storage(opts.storageOpts); + lock = opts.lock || new Lock(opts.lockOpts); + storage = opts.storage || new Storage(opts.storageOpts); blockchainExplorer = opts.blockchainExplorer; initialized = true; }; @@ -88,6 +91,11 @@ WalletService.getInstanceWithAuth = function(opts, cb) { }); }; +WalletService.prototype._runLocked = function(cb, task) { + $.checkState(this.walletId); + this.lock.runLocked(this.walletId, cb, task); +}; + /** * Creates a new wallet. @@ -190,7 +198,7 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) { if (opts.isTemporaryRequestKey) return cb(new ClientError('Bad arguments')); - Utils.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.storage.fetchWallet(self.walletId, function(err, wallet) { if (err) return cb(err); @@ -298,7 +306,8 @@ WalletService.prototype.joinWallet = function(opts, cb) { if (_.isEmpty(opts.name)) return cb(new ClientError('Invalid copayer name')); - Utils.runLocked(opts.walletId, cb, function(cb) { + self.walletId = opts.walletId; + self._runLocked(cb, function(cb) { self.storage.fetchWallet(opts.walletId, function(err, wallet) { if (err) return cb(err); @@ -357,7 +366,7 @@ WalletService.prototype.joinWallet = function(opts, cb) { WalletService.prototype.createAddress = function(opts, cb) { var self = this; - Utils.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) @@ -612,7 +621,7 @@ WalletService.prototype.createTx = function(opts, cb) { if (!Utils.checkRequired(opts, ['toAddress', 'amount', 'proposalSignature'])) return cb(new ClientError('Required argument missing')); - Utils.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); @@ -702,7 +711,7 @@ WalletService.prototype.getTx = function(opts, cb) { WalletService.prototype.removeWallet = function(opts, cb) { var self = this; - Utils.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.storage.removeWallet(self.walletId, cb); }); }; @@ -720,7 +729,7 @@ WalletService.prototype.removePendingTx = function(opts, cb) { if (!Utils.checkRequired(opts, ['txProposalId'])) return cb(new ClientError('Required argument missing')); - Utils.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getTx({ txProposalId: opts.txProposalId, @@ -1178,7 +1187,7 @@ WalletService.prototype.scan = function(opts, cb) { }; - Utils.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); diff --git a/lib/storage.js b/lib/storage.js index 32583c8..e595b7d 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -147,9 +147,9 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) { var txs = []; var key = KEY.PENDING_TXP(walletId); this.db.createReadStream({ - gte: key, - lt: key + '~' - }) + gte: key, + lt: key + '~' + }) .on('data', function(data) { txs.push(TxProposal.fromObj(data.value)); }) @@ -183,11 +183,11 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { var endkey = KEY.TXP(walletId, opts.maxTs); this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: true, - limit: opts.limit, - }) + gt: key, + lt: endkey + '~', + reverse: true, + limit: opts.limit, + }) .on('data', function(data) { txs.push(TxProposal.fromObj(data.value)); }) @@ -220,11 +220,11 @@ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: opts.reverse, - limit: opts.limit, - }) + gt: key, + lt: endkey + '~', + reverse: opts.reverse, + limit: opts.limit, + }) .on('data', function(data) { txs.push(Notification.fromObj(data.value)); }) @@ -284,9 +284,9 @@ Storage.prototype._delByKey = function(key, cb) { var self = this; var keys = []; this.db.createKeyStream({ - gte: key, - lt: key + '~', - }) + gte: key, + lt: key + '~', + }) .on('data', function(key) { keys.push(key); }) @@ -341,9 +341,9 @@ Storage.prototype.fetchAddresses = function(walletId, cb) { var addresses = []; var key = KEY.ADDRESS(walletId); this.db.createReadStream({ - gte: key, - lt: key + '~' - }) + gte: key, + lt: key + '~' + }) .on('data', function(data) { addresses.push(Address.fromObj(data.value)); }) diff --git a/lib/utils.js b/lib/utils.js index b39fb7a..aadc018 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,24 +1,8 @@ var $ = require('preconditions').singleton(); var _ = require('lodash'); -var Lock = require('./lock'); var Utils = {}; -Utils.runLocked = function(token, cb, task) { - var self = this; - - $.shouldBeDefined(token); - - Lock.get(token, function(lock) { - var _cb = function() { - cb.apply(null, arguments); - lock.free(); - }; - task(_cb); - }); -}; - - Utils.checkRequired = function(obj, args) { args = [].concat(args); if (!_.isObject(obj)) return false; diff --git a/locker-server.js b/locker-server.js new file mode 100644 index 0000000..4610684 --- /dev/null +++ b/locker-server.js @@ -0,0 +1,6 @@ +(function() { + var Locker = require('locker-server'), + locker = new Locker(); + + locker.listen(3003); +})(); diff --git a/package.json b/package.json index fe64798..7f25b9b 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,8 @@ "inherits": "^2.0.1", "leveldown": "^0.10.0", "levelup": "^0.19.0", + "locker": "^0.1.0", + "locker-server": "^0.1.3", "lodash": "^3.3.1", "mocha-lcov-reporter": "0.0.1", "morgan": "*", diff --git a/test/locallock.js b/test/locallock.js new file mode 100644 index 0000000..5604511 --- /dev/null +++ b/test/locallock.js @@ -0,0 +1,111 @@ +'use strict'; + +var _ = require('lodash'); +var chai = require('chai'); +var sinon = require('sinon'); +var should = chai.should(); +var Lock = require('../lib/locallock'); + + +describe('Local locks', function() { + var lock; + beforeEach(function() { + this.clock = sinon.useFakeTimers(); + lock = new Lock(); + }); + afterEach(function() { + this.clock.restore(); + }); + it('should lock tasks using the same token', function() { + var a = false, + b = false; + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); + a = true; + setTimeout(function() { + release(); + }, 5); + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); + b = true; + release(); + }); + }); + a.should.equal(true); + b.should.equal(false); + this.clock.tick(10); + a.should.equal(true); + b.should.equal(true); + }); + it('should not lock tasks using different tokens', function() { + var i = 0; + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); + i++; + setTimeout(function() { + release(); + }, 5); + lock.locked('456', 0, 0, function(err, release) { + should.not.exist(err); + i++; + release(); + }); + }); + i.should.equal(2); + }); + it('should return error if unable to acquire lock', function() { + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); + setTimeout(function() { + release(); + }, 5); + lock.locked('123', 1, 0, function(err, release) { + should.exist(err); + err.toString().should.contain('Could not acquire lock 123'); + }); + }); + this.clock.tick(2); + }); + it('should release lock if acquired for a long time', function() { + var i = 0; + lock.locked('123', 0, 3, function(err, release) { + should.not.exist(err); + i++; + lock.locked('123', 20, 0, function(err, release) { + should.not.exist(err); + i++; + release(); + }); + }); + i.should.equal(1); + this.clock.tick(1); + i.should.equal(1); + this.clock.tick(10); + i.should.equal(2); + }); + it('should only release one pending task on lock timeout', function() { + var i = 0; + lock.locked('123', 0, 3, function(err, release) { + should.not.exist(err); + i++; + lock.locked('123', 5, 0, function(err, release) { + should.not.exist(err); + i++; + setTimeout(function() { + release(); + }, 5); + }); + lock.locked('123', 20, 0, function(err, release) { + should.not.exist(err); + i++; + release(); + }); + }); + i.should.equal(1); + this.clock.tick(4); + i.should.equal(2) + this.clock.tick(7); + i.should.equal(3) + }); + +}); diff --git a/test/lock.js b/test/lock.js deleted file mode 100644 index 95ae2fe..0000000 --- a/test/lock.js +++ /dev/null @@ -1,50 +0,0 @@ -'use strict'; - -var _ = require('lodash'); -var chai = require('chai'); -var sinon = require('sinon'); -var should = chai.should(); -var Lock = require('../lib/lock'); - -describe('Lock', function() { - it('should lock tasks using the same token', function(done) { - var a = false, - b = false; - Lock.get('123', function(lock) { - a = true; - setTimeout(function() { - lock.free(); - }, 5); - Lock.get('123', function(lock) { - b = true; - lock.free(); - }); - }); - setTimeout(function() { - a.should.equal(true); - b.should.equal(false); - }, 1); - setTimeout(function() { - a.should.equal(true); - b.should.equal(true); - done(); - }, 8); - }); - it('should not lock tasks using different tokens', function(done) { - var i = 0; - Lock.get('123', function(lock) { - i++; - setTimeout(function() { - lock.free(); - }, 5); - Lock.get('456', function(lock) { - i++; - lock.free(); - }); - }); - setTimeout(function() { - i.should.equal(2); - done(); - }, 1); - }); -});