From b555e3fe9430182371e7b84bc5e8390ea34d23de Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Tue, 7 Apr 2015 16:34:48 -0300 Subject: [PATCH 1/8] add locker module --- config.js | 8 ++++++++ package.json | 1 + 2 files changed, 9 insertions(+) diff --git a/config.js b/config.js index c13db93..c25aabc 100644 --- a/config.js +++ b/config.js @@ -21,6 +21,14 @@ var config = { }, */ }, + lockOpts: { + /* To use locker-server, uncomment this: + lockerServer: { + host: 'localhost', + port: 3003, + }, + */ + }, }, }; module.exports = config; diff --git a/package.json b/package.json index fe64798..eeb3e79 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "inherits": "^2.0.1", "leveldown": "^0.10.0", "levelup": "^0.19.0", + "locker": "^0.1.0", "lodash": "^3.3.1", "mocha-lcov-reporter": "0.0.1", "morgan": "*", From 0631083baeefc1d092011a67738056d3ed847c5a Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Tue, 7 Apr 2015 17:02:08 -0300 Subject: [PATCH 2/8] common interface for locks --- lib/locallock.js | 34 +++++++++++++++++++++++++++++ lib/lock.js | 40 +++++++++++----------------------- lib/server.js | 21 ++++++++++-------- lib/utils.js | 16 -------------- test/{lock.js => locallock.js} | 4 ++-- 5 files changed, 61 insertions(+), 54 deletions(-) create mode 100644 lib/locallock.js rename test/{lock.js => locallock.js} (93%) diff --git a/lib/locallock.js b/lib/locallock.js new file mode 100644 index 0000000..6ca71a4 --- /dev/null +++ b/lib/locallock.js @@ -0,0 +1,34 @@ +var _ = require('lodash'); +var $ = require('preconditions').singleton(); + +var locks = {}; + +var Lock = function() { + this.taken = false; + this.queue = []; +}; + +Lock.prototype.free = function() { + if (this.queue.length > 0) { + var f = this.queue.shift(); + f(this); + } else { + this.taken = false; + } +}; + +Lock.get = function(key, callback) { + if (_.isUndefined(locks[key])) { + locks[key] = new Lock(); + } + var lock = locks[key]; + + if (lock.taken) { + lock.queue.push(callback); + } else { + lock.taken = true; + callback(lock); + } +}; + +module.exports = Lock; diff --git a/lib/lock.js b/lib/lock.js index 6ca71a4..65e7259 100644 --- a/lib/lock.js +++ b/lib/lock.js @@ -1,34 +1,20 @@ -var _ = require('lodash'); var $ = require('preconditions').singleton(); +var _ = require('lodash'); +var LocalLock = require('./locallock'); +var RemoteLock = require('locker'); -var locks = {}; - -var Lock = function() { - this.taken = false; - this.queue = []; -}; - -Lock.prototype.free = function() { - if (this.queue.length > 0) { - var f = this.queue.shift(); - f(this); - } else { - this.taken = false; - } -}; +function Lock(opts) {}; -Lock.get = function(key, callback) { - if (_.isUndefined(locks[key])) { - locks[key] = new Lock(); - } - var lock = locks[key]; +Lock.prototype.runLocked = function(token, cb, task) { + $.shouldBeDefined(token); - if (lock.taken) { - lock.queue.push(callback); - } else { - lock.taken = true; - callback(lock); - } + LocalLock.get(token, function(lock) { + var _cb = function() { + cb.apply(null, arguments); + lock.free(); + }; + task(_cb); + }); }; module.exports = Lock; diff --git a/lib/server.js b/lib/server.js index 9f804b8..abf662a 100644 --- a/lib/server.js +++ b/lib/server.js @@ -13,6 +13,7 @@ var Address = Bitcore.Address; var ClientError = require('./clienterror'); var Utils = require('./utils'); +var Lock = require('./lock'); var Storage = require('./storage'); var NotificationBroadcaster = require('./notificationbroadcaster'); var BlockchainExplorer = require('./blockchainexplorer'); @@ -24,7 +25,7 @@ var TxProposal = require('./model/txproposal'); var Notification = require('./model/notification'); var initialized = false; -var storage, blockchainExplorer; +var lock, storage, blockchainExplorer; /** @@ -35,6 +36,7 @@ function WalletService() { if (!initialized) throw new Error('Server not initialized'); + this.lock = lock; this.storage = storage; this.blockchainExplorer = blockchainExplorer; this.notifyTicker = 0; @@ -52,7 +54,8 @@ WalletService.onNotification = function(func) { */ WalletService.initialize = function(opts) { opts = opts || {}; - storage = opts.storage ||  new Storage(opts.storageOpts); + lock = opts.lock || new Lock(opts.lockOpts); + storage = opts.storage || new Storage(opts.storageOpts); blockchainExplorer = opts.blockchainExplorer; initialized = true; }; @@ -190,7 +193,7 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) { if (opts.isTemporaryRequestKey) return cb(new ClientError('Bad arguments')); - Utils.runLocked(self.walletId, cb, function(cb) { + self.lock.runLocked(self.walletId, cb, function(cb) { self.storage.fetchWallet(self.walletId, function(err, wallet) { if (err) return cb(err); @@ -298,7 +301,7 @@ WalletService.prototype.joinWallet = function(opts, cb) { if (_.isEmpty(opts.name)) return cb(new ClientError('Invalid copayer name')); - Utils.runLocked(opts.walletId, cb, function(cb) { + self.lock.runLocked(opts.walletId, cb, function(cb) { self.storage.fetchWallet(opts.walletId, function(err, wallet) { if (err) return cb(err); @@ -357,7 +360,7 @@ WalletService.prototype.joinWallet = function(opts, cb) { WalletService.prototype.createAddress = function(opts, cb) { var self = this; - Utils.runLocked(self.walletId, cb, function(cb) { + self.lock.runLocked(self.walletId, cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) @@ -612,7 +615,7 @@ WalletService.prototype.createTx = function(opts, cb) { if (!Utils.checkRequired(opts, ['toAddress', 'amount', 'proposalSignature'])) return cb(new ClientError('Required argument missing')); - Utils.runLocked(self.walletId, cb, function(cb) { + self.lock.runLocked(self.walletId, cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); @@ -702,7 +705,7 @@ WalletService.prototype.getTx = function(opts, cb) { WalletService.prototype.removeWallet = function(opts, cb) { var self = this; - Utils.runLocked(self.walletId, cb, function(cb) { + self.lock.runLocked(self.walletId, cb, function(cb) { self.storage.removeWallet(self.walletId, cb); }); }; @@ -720,7 +723,7 @@ WalletService.prototype.removePendingTx = function(opts, cb) { if (!Utils.checkRequired(opts, ['txProposalId'])) return cb(new ClientError('Required argument missing')); - Utils.runLocked(self.walletId, cb, function(cb) { + self.lock.runLocked(self.walletId, cb, function(cb) { self.getTx({ txProposalId: opts.txProposalId, @@ -1178,7 +1181,7 @@ WalletService.prototype.scan = function(opts, cb) { }; - Utils.runLocked(self.walletId, cb, function(cb) { + self.lock.runLocked(self.walletId, cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); diff --git a/lib/utils.js b/lib/utils.js index b39fb7a..aadc018 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,24 +1,8 @@ var $ = require('preconditions').singleton(); var _ = require('lodash'); -var Lock = require('./lock'); var Utils = {}; -Utils.runLocked = function(token, cb, task) { - var self = this; - - $.shouldBeDefined(token); - - Lock.get(token, function(lock) { - var _cb = function() { - cb.apply(null, arguments); - lock.free(); - }; - task(_cb); - }); -}; - - Utils.checkRequired = function(obj, args) { args = [].concat(args); if (!_.isObject(obj)) return false; diff --git a/test/lock.js b/test/locallock.js similarity index 93% rename from test/lock.js rename to test/locallock.js index 95ae2fe..ea98ecf 100644 --- a/test/lock.js +++ b/test/locallock.js @@ -4,9 +4,9 @@ var _ = require('lodash'); var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); -var Lock = require('../lib/lock'); +var Lock = require('../lib/locallock'); -describe('Lock', function() { +describe('Local lock', function() { it('should lock tasks using the same token', function(done) { var a = false, b = false; From c33e0b0de45d666e17ea2cd39a0fb8baaabf955e Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 8 Apr 2015 10:21:01 -0300 Subject: [PATCH 3/8] local lock --- lib/locallock.js | 40 ++++++++++++++++++++-------------------- lib/lock.js | 9 ++++++--- test/locallock.js | 27 ++++++++++++++++++--------- 3 files changed, 44 insertions(+), 32 deletions(-) diff --git a/lib/locallock.js b/lib/locallock.js index 6ca71a4..4826bad 100644 --- a/lib/locallock.js +++ b/lib/locallock.js @@ -1,33 +1,33 @@ var _ = require('lodash'); var $ = require('preconditions').singleton(); - var locks = {}; -var Lock = function() { - this.taken = false; - this.queue = []; +function Lock() { + }; -Lock.prototype.free = function() { - if (this.queue.length > 0) { - var f = this.queue.shift(); - f(this); - } else { - this.taken = false; - } +Lock.prototype._runOne = function(token) { + var self = this; + + if (locks[token].length == 0) return; + + var task = locks[token][0]; + + task(null, function() { + locks[token].shift(); + self._runOne(token); + }); }; -Lock.get = function(key, callback) { - if (_.isUndefined(locks[key])) { - locks[key] = new Lock(); +Lock.prototype.locked = function(token, wait, max, task) { + if (_.isUndefined(locks[token])) { + locks[token] = []; } - var lock = locks[key]; - if (lock.taken) { - lock.queue.push(callback); - } else { - lock.taken = true; - callback(lock); + locks[token].push(task); + + if (locks[token].length == 1) { + this._runOne(token); } }; diff --git a/lib/lock.js b/lib/lock.js index 65e7259..f5435df 100644 --- a/lib/lock.js +++ b/lib/lock.js @@ -3,15 +3,18 @@ var _ = require('lodash'); var LocalLock = require('./locallock'); var RemoteLock = require('locker'); -function Lock(opts) {}; +function Lock(opts) { + this.lock = new LocalLock(); +}; Lock.prototype.runLocked = function(token, cb, task) { $.shouldBeDefined(token); - LocalLock.get(token, function(lock) { + this.lock.locked(token, 2 * 1000, 10 * 60 * 1000, function(err, release) { + if (err) return cb(new Error('Wallet is locked')); var _cb = function() { cb.apply(null, arguments); - lock.free(); + release(); }; task(_cb); }); diff --git a/test/locallock.js b/test/locallock.js index ea98ecf..7e0b8f5 100644 --- a/test/locallock.js +++ b/test/locallock.js @@ -6,18 +6,25 @@ var sinon = require('sinon'); var should = chai.should(); var Lock = require('../lib/locallock'); -describe('Local lock', function() { + +describe('Local locks', function() { + var lock; + beforeEach(function() { + lock = new Lock(); + }); it('should lock tasks using the same token', function(done) { var a = false, b = false; - Lock.get('123', function(lock) { + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); a = true; setTimeout(function() { - lock.free(); + release(); }, 5); - Lock.get('123', function(lock) { + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); b = true; - lock.free(); + release(); }); }); setTimeout(function() { @@ -32,14 +39,16 @@ describe('Local lock', function() { }); it('should not lock tasks using different tokens', function(done) { var i = 0; - Lock.get('123', function(lock) { + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); i++; setTimeout(function() { - lock.free(); + release(); }, 5); - Lock.get('456', function(lock) { + lock.locked('456', 0, 0, function(err, release) { + should.not.exist(err); i++; - lock.free(); + release(); }); }); setTimeout(function() { From 7a8a7ea997c4e74a6c878cea080397072ee05a94 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 8 Apr 2015 11:39:15 -0300 Subject: [PATCH 4/8] timeouts --- lib/locallock.js | 38 ++++++++++++++++++++++++++++++-------- test/locallock.js | 29 +++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/lib/locallock.js b/lib/locallock.js index 4826bad..e794819 100644 --- a/lib/locallock.js +++ b/lib/locallock.js @@ -9,26 +9,48 @@ function Lock() { Lock.prototype._runOne = function(token) { var self = this; - if (locks[token].length == 0) return; - - var task = locks[token][0]; + var item = _.first(locks[token]); + if (!item || item.started) return; + + item.started = true; + if (item.maxRunningTime > 0) { + setTimeout(function() { + var it = _.first(locks[token]); + if (it != item) return; + locks[token].shift(); + self._runOne(token); + }, item.maxRunningTime); + } - task(null, function() { + item.fn(null, function() { locks[token].shift(); self._runOne(token); }); }; Lock.prototype.locked = function(token, wait, max, task) { + var self = this; + if (_.isUndefined(locks[token])) { locks[token] = []; } - locks[token].push(task); - - if (locks[token].length == 1) { - this._runOne(token); + var item = { + maxRunningTime: max, + started: false, + fn: task, + }; + locks[token].push(item); + + if (wait > 0) { + setTimeout(function() { + var it = _.find(locks[token], item); + if (!it || it.started) return; + locks[token] = _.without(locks[token], it); + it.fn(new Error('Could not acquire lock ' + token)); + }, wait); } + self._runOne(token); }; module.exports = Lock; diff --git a/test/locallock.js b/test/locallock.js index 7e0b8f5..468511e 100644 --- a/test/locallock.js +++ b/test/locallock.js @@ -56,4 +56,33 @@ describe('Local locks', function() { done(); }, 1); }); + it('should return error if unable to acquire lock', function(done) { + lock.locked('123', 0, 0, function(err, release) { + should.not.exist(err); + setTimeout(function() { + release(); + }, 5); + lock.locked('123', 1, 0, function(err, release) { + should.exist(err); + err.toString().should.contain('Could not acquire lock 123'); + done(); + }); + }); + }); + it('should release lock if acquired for a long time', function(done) { + var i = 0; + lock.locked('123', 0, 3, function(err, release) { + should.not.exist(err); + i++; + lock.locked('123', 15, 0, function(err, release) { + should.not.exist(err); + i++; + }); + }); + setTimeout(function() { + i.should.equal(2); + done(); + }, 10); + }); + }); From 1bbdd4c14c871ee8d7bbbd61c3dca9c3e1b0fd6c Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 8 Apr 2015 15:02:33 -0300 Subject: [PATCH 5/8] sample server --- lib/lock.js | 17 ++++++++++++++++- lib/storage.js | 38 +++++++++++++++++++------------------- locker-server.js | 6 ++++++ package.json | 1 + 4 files changed, 42 insertions(+), 20 deletions(-) create mode 100644 locker-server.js diff --git a/lib/lock.js b/lib/lock.js index f5435df..0c91456 100644 --- a/lib/lock.js +++ b/lib/lock.js @@ -1,10 +1,25 @@ var $ = require('preconditions').singleton(); var _ = require('lodash'); +var log = require('npmlog'); +log.debug = log.verbose; + var LocalLock = require('./locallock'); var RemoteLock = require('locker'); function Lock(opts) { - this.lock = new LocalLock(); + opts = opts || {}; + if (opts.lockerServer) { + this.lock = new RemoteLock(opts.lockerServer.port, opts.lockerServer.host); + + this.lock.on('reset', function() { + log.debug('Locker server reset'); + }); + this.lock.on('error', function(error) { + log.error('Locker server threw error', error); + }); + } else { + this.lock = new LocalLock(); + } }; Lock.prototype.runLocked = function(token, cb, task) { diff --git a/lib/storage.js b/lib/storage.js index 32583c8..e595b7d 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -147,9 +147,9 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) { var txs = []; var key = KEY.PENDING_TXP(walletId); this.db.createReadStream({ - gte: key, - lt: key + '~' - }) + gte: key, + lt: key + '~' + }) .on('data', function(data) { txs.push(TxProposal.fromObj(data.value)); }) @@ -183,11 +183,11 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { var endkey = KEY.TXP(walletId, opts.maxTs); this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: true, - limit: opts.limit, - }) + gt: key, + lt: endkey + '~', + reverse: true, + limit: opts.limit, + }) .on('data', function(data) { txs.push(TxProposal.fromObj(data.value)); }) @@ -220,11 +220,11 @@ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: opts.reverse, - limit: opts.limit, - }) + gt: key, + lt: endkey + '~', + reverse: opts.reverse, + limit: opts.limit, + }) .on('data', function(data) { txs.push(Notification.fromObj(data.value)); }) @@ -284,9 +284,9 @@ Storage.prototype._delByKey = function(key, cb) { var self = this; var keys = []; this.db.createKeyStream({ - gte: key, - lt: key + '~', - }) + gte: key, + lt: key + '~', + }) .on('data', function(key) { keys.push(key); }) @@ -341,9 +341,9 @@ Storage.prototype.fetchAddresses = function(walletId, cb) { var addresses = []; var key = KEY.ADDRESS(walletId); this.db.createReadStream({ - gte: key, - lt: key + '~' - }) + gte: key, + lt: key + '~' + }) .on('data', function(data) { addresses.push(Address.fromObj(data.value)); }) diff --git a/locker-server.js b/locker-server.js new file mode 100644 index 0000000..4610684 --- /dev/null +++ b/locker-server.js @@ -0,0 +1,6 @@ +(function() { + var Locker = require('locker-server'), + locker = new Locker(); + + locker.listen(3003); +})(); diff --git a/package.json b/package.json index eeb3e79..7f25b9b 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "leveldown": "^0.10.0", "levelup": "^0.19.0", "locker": "^0.1.0", + "locker-server": "^0.1.3", "lodash": "^3.3.1", "mocha-lcov-reporter": "0.0.1", "morgan": "*", From 8e259c0e09ae1a51dda96c9a572fdb0944e64e2b Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 8 Apr 2015 15:18:28 -0300 Subject: [PATCH 6/8] simplify locking syntax --- lib/server.js | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/lib/server.js b/lib/server.js index abf662a..948b7c0 100644 --- a/lib/server.js +++ b/lib/server.js @@ -91,6 +91,11 @@ WalletService.getInstanceWithAuth = function(opts, cb) { }); }; +WalletService.prototype._runLocked = function(cb, task) { + $.checkState(this.walletId); + this.lock.runLocked(this.walletId, cb, task); +}; + /** * Creates a new wallet. @@ -193,7 +198,7 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) { if (opts.isTemporaryRequestKey) return cb(new ClientError('Bad arguments')); - self.lock.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.storage.fetchWallet(self.walletId, function(err, wallet) { if (err) return cb(err); @@ -301,7 +306,8 @@ WalletService.prototype.joinWallet = function(opts, cb) { if (_.isEmpty(opts.name)) return cb(new ClientError('Invalid copayer name')); - self.lock.runLocked(opts.walletId, cb, function(cb) { + self.walletId = opts.walletId; + self._runLocked(cb, function(cb) { self.storage.fetchWallet(opts.walletId, function(err, wallet) { if (err) return cb(err); @@ -360,7 +366,7 @@ WalletService.prototype.joinWallet = function(opts, cb) { WalletService.prototype.createAddress = function(opts, cb) { var self = this; - self.lock.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) @@ -615,7 +621,7 @@ WalletService.prototype.createTx = function(opts, cb) { if (!Utils.checkRequired(opts, ['toAddress', 'amount', 'proposalSignature'])) return cb(new ClientError('Required argument missing')); - self.lock.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); @@ -705,7 +711,7 @@ WalletService.prototype.getTx = function(opts, cb) { WalletService.prototype.removeWallet = function(opts, cb) { var self = this; - self.lock.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.storage.removeWallet(self.walletId, cb); }); }; @@ -723,7 +729,7 @@ WalletService.prototype.removePendingTx = function(opts, cb) { if (!Utils.checkRequired(opts, ['txProposalId'])) return cb(new ClientError('Required argument missing')); - self.lock.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getTx({ txProposalId: opts.txProposalId, @@ -1181,7 +1187,7 @@ WalletService.prototype.scan = function(opts, cb) { }; - self.lock.runLocked(self.walletId, cb, function(cb) { + self._runLocked(cb, function(cb) { self.getWallet({}, function(err, wallet) { if (err) return cb(err); if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete')); From 34790ec2e247b19bbf979594370ea695181d57eb Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 8 Apr 2015 15:28:35 -0300 Subject: [PATCH 7/8] increase wait & max time for locks --- lib/lock.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/lock.js b/lib/lock.js index 0c91456..dda8e9d 100644 --- a/lib/lock.js +++ b/lib/lock.js @@ -25,7 +25,7 @@ function Lock(opts) { Lock.prototype.runLocked = function(token, cb, task) { $.shouldBeDefined(token); - this.lock.locked(token, 2 * 1000, 10 * 60 * 1000, function(err, release) { + this.lock.locked(token, 5 * 1000, 24 * 60 * 60 * 1000, function(err, release) { if (err) return cb(new Error('Wallet is locked')); var _cb = function() { cb.apply(null, arguments); From 59996ef073a4352ecc99c5f6b79ecbd9443426e0 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 8 Apr 2015 16:38:01 -0300 Subject: [PATCH 8/8] refactor tests to use fake timers --- lib/locallock.js | 57 ++++++++++++++++++++++----------------- test/locallock.js | 69 +++++++++++++++++++++++++++++++---------------- 2 files changed, 78 insertions(+), 48 deletions(-) diff --git a/lib/locallock.js b/lib/locallock.js index e794819..cddc50c 100644 --- a/lib/locallock.js +++ b/lib/locallock.js @@ -1,55 +1,62 @@ var _ = require('lodash'); var $ = require('preconditions').singleton(); -var locks = {}; function Lock() { + this.tasks = {}; +}; +Lock.prototype._release = function(token, task) { + if (!task.running) return; + task.running = false; + this.tasks[token] = _.without(this.tasks[token], task); + this._runOne(token); }; Lock.prototype._runOne = function(token) { var self = this; - var item = _.first(locks[token]); - if (!item || item.started) return; + if (_.any(self.tasks[token], { + running: true + })) return; + + var task = _.first(self.tasks[token]); + if (!task) return; - item.started = true; - if (item.maxRunningTime > 0) { + task.running = true; + + if (task.timeout > 0) { setTimeout(function() { - var it = _.first(locks[token]); - if (it != item) return; - locks[token].shift(); - self._runOne(token); - }, item.maxRunningTime); + self._release(token, task); + }, task.timeout); } - item.fn(null, function() { - locks[token].shift(); - self._runOne(token); + task.fn(null, function() { + self._release(token, task); }); }; -Lock.prototype.locked = function(token, wait, max, task) { +Lock.prototype.locked = function(token, wait, max, userTask) { var self = this; - if (_.isUndefined(locks[token])) { - locks[token] = []; + if (_.isUndefined(self.tasks[token])) { + self.tasks[token] = []; } - var item = { - maxRunningTime: max, - started: false, - fn: task, + var task = { + timeout: max, + running: false, + fn: userTask, }; - locks[token].push(item); + self.tasks[token].push(task); if (wait > 0) { setTimeout(function() { - var it = _.find(locks[token], item); - if (!it || it.started) return; - locks[token] = _.without(locks[token], it); - it.fn(new Error('Could not acquire lock ' + token)); + if (task.running || !_.contains(self.tasks[token], task)) return; + self.tasks[token] = _.without(self.tasks[token], task); + task.fn(new Error('Could not acquire lock ' + token)); }, wait); } + self._runOne(token); }; diff --git a/test/locallock.js b/test/locallock.js index 468511e..5604511 100644 --- a/test/locallock.js +++ b/test/locallock.js @@ -10,9 +10,13 @@ var Lock = require('../lib/locallock'); describe('Local locks', function() { var lock; beforeEach(function() { + this.clock = sinon.useFakeTimers(); lock = new Lock(); }); - it('should lock tasks using the same token', function(done) { + afterEach(function() { + this.clock.restore(); + }); + it('should lock tasks using the same token', function() { var a = false, b = false; lock.locked('123', 0, 0, function(err, release) { @@ -27,17 +31,13 @@ describe('Local locks', function() { release(); }); }); - setTimeout(function() { - a.should.equal(true); - b.should.equal(false); - }, 1); - setTimeout(function() { - a.should.equal(true); - b.should.equal(true); - done(); - }, 8); + a.should.equal(true); + b.should.equal(false); + this.clock.tick(10); + a.should.equal(true); + b.should.equal(true); }); - it('should not lock tasks using different tokens', function(done) { + it('should not lock tasks using different tokens', function() { var i = 0; lock.locked('123', 0, 0, function(err, release) { should.not.exist(err); @@ -51,12 +51,9 @@ describe('Local locks', function() { release(); }); }); - setTimeout(function() { - i.should.equal(2); - done(); - }, 1); + i.should.equal(2); }); - it('should return error if unable to acquire lock', function(done) { + it('should return error if unable to acquire lock', function() { lock.locked('123', 0, 0, function(err, release) { should.not.exist(err); setTimeout(function() { @@ -65,24 +62,50 @@ describe('Local locks', function() { lock.locked('123', 1, 0, function(err, release) { should.exist(err); err.toString().should.contain('Could not acquire lock 123'); - done(); }); }); + this.clock.tick(2); }); - it('should release lock if acquired for a long time', function(done) { + it('should release lock if acquired for a long time', function() { var i = 0; lock.locked('123', 0, 3, function(err, release) { should.not.exist(err); i++; - lock.locked('123', 15, 0, function(err, release) { + lock.locked('123', 20, 0, function(err, release) { should.not.exist(err); i++; + release(); + }); + }); + i.should.equal(1); + this.clock.tick(1); + i.should.equal(1); + this.clock.tick(10); + i.should.equal(2); + }); + it('should only release one pending task on lock timeout', function() { + var i = 0; + lock.locked('123', 0, 3, function(err, release) { + should.not.exist(err); + i++; + lock.locked('123', 5, 0, function(err, release) { + should.not.exist(err); + i++; + setTimeout(function() { + release(); + }, 5); + }); + lock.locked('123', 20, 0, function(err, release) { + should.not.exist(err); + i++; + release(); }); }); - setTimeout(function() { - i.should.equal(2); - done(); - }, 10); + i.should.equal(1); + this.clock.tick(4); + i.should.equal(2) + this.clock.tick(7); + i.should.equal(3) }); });