Browse Source

Merge pull request #2 from matiu/feat/notifications2

Feat/notifications
activeAddress
Ivan Socolsky 10 years ago
parent
commit
9cadcfe7f6
  1. 48
      lib/model/notification.js
  2. 6
      lib/model/txproposal.js
  3. 117
      lib/server.js
  4. 93
      lib/storage.js
  5. 2
      package.json
  6. 141
      test/integration.js

48
lib/model/notification.js

@ -0,0 +1,48 @@
var Uuid = require('uuid');
/*
* notifications examples
*
* NewCopayer -
* NewAddress -
* NewTxProposal - (amount)
* TxProposalAcceptedBy - (txProposalId, copayerId)
* TxProposalRejectedBy - (txProposalId, copayerId)
* txProposalFinallyRejected - txProposalId
* txProposalFinallyAccepted - txProposalId
*
* newIncommingTx (amount)
* newOutgoingTx - (txProposalId, txid)
*
* data Examples:
* { amount: 'xxx', address: 'xxx'}
* { txProposalId: 'xxx', copayerId: 'xxx' }
*
* Data is meant to provide only the needed information
* to notify the user
*
*/
function Notification(opts) {
opts = opts || {};
var now = Date.now();
this.createdOn = Math.floor(now / 1000);
this.id = ('00000000000000' + now).slice(-14) + ('0000' + opts.ticker||0).slice(-4) ;
this.type = opts.type || 'general';
this.data = opts.data;
};
Notification.fromObj = function(obj) {
var x= new Notification();
x.createdOn = obj.createdOn;
x.type = obj.type,
x.data = obj.data;
return x;
};
module.exports = Notification;

6
lib/model/txproposal.js

@ -13,8 +13,10 @@ function TxProposal(opts) {
opts = opts || {};
this.version = VERSION;
this.createdOn = Math.floor(Date.now() / 1000);
this.id = ('000000000000' + this.createdOn).slice(-12) + Uuid.v4();
var now = Date.now();
this.createdOn = Math.floor(now / 1000);
this.id = ('00000000000000' + now).slice(-14) + Uuid.v4();
this.creatorId = opts.creatorId;
this.toAddress = opts.toAddress;
this.amount = opts.amount;

117
lib/server.js

@ -6,6 +6,7 @@ var log = require('npmlog');
log.debug = log.verbose;
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
var Bitcore = require('bitcore');
var PublicKey = Bitcore.PublicKey;
@ -22,6 +23,7 @@ var Wallet = require('./model/wallet');
var Copayer = require('./model/copayer');
var Address = require('./model/address');
var TxProposal = require('./model/txproposal');
var Notification = require('./model/Notification');
var initialized = false;
@ -34,8 +36,11 @@ var storage;
function CopayServer() {
if (!initialized) throw new Error('Server not initialized');
this.storage = storage;
this.notifyTicker = 0;
};
nodeutil.inherits(CopayServer, events.EventEmitter);
/**
* Initializes global settings for all instances.
@ -61,7 +66,8 @@ CopayServer.getInstance = function() {
*/
CopayServer.getInstanceWithAuth = function(opts, cb) {
if (!Utils.checkRequired(opts, ['copayerId', 'message', 'signature'])) return cb(new ClientError('Required argument missing'));
if (!Utils.checkRequired(opts, ['copayerId', 'message', 'signature']))
return cb(new ClientError('Required argument missing'));
var server = new CopayServer();
server.storage.fetchCopayerLookup(opts.copayerId, function(err, copayer) {
@ -148,6 +154,29 @@ CopayServer.prototype._verifySignature = function(text, signature, pubKey) {
return SignUtils.verify(text, signature, pubKey);
};
/**
* _notify
*
* @param type
* @param data
*/
CopayServer.prototype._notify = function(type, data) {
var self = this;
var walletId = self.walletId || data.walletId;
$.checkState(walletId);
var n = new Notification({
type: type,
data: data,
ticker: this.notifyTicker++,
});
this.storage.storeNotification(walletId, n, function() {
self.emit(n);
});
};
/**
* Joins a wallet in creation.
* @param {Object} opts
@ -162,7 +191,8 @@ CopayServer.prototype.joinWallet = function(opts, cb) {
if (!Utils.checkRequired(opts, ['walletId', 'name', 'xPubKey', 'xPubKeySignature']))
return cb(new ClientError('Required argument missing'));
if (_.isEmpty(opts.name)) return cb(new ClientError('Invalid copayer name'));
if (_.isEmpty(opts.name))
return cb(new ClientError('Invalid copayer name'));
Utils.runLocked(opts.walletId, cb, function(cb) {
self.storage.fetchWallet(opts.walletId, function(err, wallet) {
@ -176,7 +206,8 @@ CopayServer.prototype.joinWallet = function(opts, cb) {
if (_.find(wallet.copayers, {
xPubKey: opts.xPubKey
})) return cb(new ClientError('CINWALLET', 'Copayer already in wallet'));
if (wallet.copayers.length == wallet.n) return cb(new ClientError('WFULL', 'Wallet full'));
if (wallet.copayers.length == wallet.n)
return cb(new ClientError('WFULL', 'Wallet full'));
var copayer = new Copayer({
name: opts.name,
@ -187,6 +218,10 @@ CopayServer.prototype.joinWallet = function(opts, cb) {
wallet.addCopayer(copayer);
self.storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) {
self._notify('NewCopayer', {
walletId: opts.walletId,
copayerId: copayer.id,
});
return cb(err, copayer.id);
});
});
@ -204,13 +239,15 @@ CopayServer.prototype.createAddress = function(opts, cb) {
Utils.runLocked(self.walletId, cb, function(cb) {
self.getWallet({}, function(err, wallet) {
if (err) return cb(err);
if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete'));
if (!wallet.isComplete())
return cb(new ClientError('Wallet is not complete'));
var address = wallet.createAddress(false);
self.storage.storeAddressAndWallet(wallet, address, function(err) {
if (err) return cb(err);
self._notify('NewAddress');
return cb(null, address);
});
});
@ -403,11 +440,13 @@ CopayServer.prototype.createTx = function(opts, cb) {
self.getWallet({}, function(err, wallet) {
if (err) return cb(err);
if (!wallet.isComplete()) return cb(new ClientError('Wallet is not complete'));
if (wallet.isShared() && !Utils.checkRequired(opts, 'proposalSignature')) return cb(new ClientError('Proposal signature is required for shared wallets'));
if (wallet.isShared() && !Utils.checkRequired(opts, 'proposalSignature'))
return cb(new ClientError('Proposal signature is required for shared wallets'));
var copayer = wallet.getCopayer(self.copayerId);
var msg = opts.toAddress + '|' + opts.amount + '|' + opts.message;
if (!self._verifySignature(msg, opts.proposalSignature, copayer.signingPubKey)) return cb(new ClientError('Invalid proposal signature'));
if (!self._verifySignature(msg, opts.proposalSignature, copayer.signingPubKey))
return cb(new ClientError('Invalid proposal signature'));
var toAddress;
try {
@ -415,7 +454,8 @@ CopayServer.prototype.createTx = function(opts, cb) {
} catch (ex) {
return cb(new ClientError('INVALIDADDRESS', 'Invalid address'));
}
if (toAddress.network != wallet.getNetworkName()) return cb(new ClientError('INVALIDADDRESS', 'Incorrect address network'));
if (toAddress.network != wallet.getNetworkName())
return cb(new ClientError('INVALIDADDRESS', 'Incorrect address network'));
self._getUtxos(function(err, utxos) {
if (err) return cb(err);
@ -449,6 +489,9 @@ CopayServer.prototype.createTx = function(opts, cb) {
self.storage.storeTx(wallet.id, txp, function(err) {
if (err) return cb(err);
self._notify('NewTxProposal', {
amount: opts.amount
});
return cb(null, txp);
});
});
@ -521,6 +564,7 @@ CopayServer.prototype.removePendingTx = function(opts, cb) {
if (actors.length > 1 || (actors.length == 1 && actors[0] !== self.copayerId))
return cb(new ClientError('Cannot remove a proposal signed/rejected by other copayers'));
self._notify('transactionProposalRemoved');
self.storage.removeTx(self.walletId, opts.id, cb);
});
});
@ -573,7 +617,17 @@ CopayServer.prototype.signTx = function(opts, cb) {
self.storage.storeTx(self.walletId, txp, function(err) {
if (err) return cb(err);
self._notify('TxProposalAcceptedBy', {
txProposalId: opts.txProposalId,
copayerId: self.copayerId,
});
if (txp.status == 'accepted') {
self._notify('TxProposalFinallyAccepted', {
txProposalId: opts.txProposalId,
});
self._broadcastTx(txp, function(err, txid) {
if (err) return cb(err);
@ -581,6 +635,11 @@ CopayServer.prototype.signTx = function(opts, cb) {
self.storage.storeTx(self.walletId, txp, function(err) {
if (err) return cb(err);
self._notify('NewOutgoingTx', {
txProposalId: opts.txProposalId,
txid: txid
});
return cb(null, txp);
});
});
@ -612,14 +671,29 @@ CopayServer.prototype.rejectTx = function(opts, cb) {
var action = _.find(txp.actions, {
copayerId: self.copayerId
});
if (action) return cb(new ClientError('CVOTED', 'Copayer already voted on this transaction proposal'));
if (txp.status != 'pending') return cb(new ClientError('TXNOTPENDING', 'The transaction proposal is not pending'));
if (action)
return cb(new ClientError('CVOTED', 'Copayer already voted on this transaction proposal'));
if (txp.status != 'pending')
return cb(new ClientError('TXNOTPENDING', 'The transaction proposal is not pending'));
txp.reject(self.copayerId);
self.storage.storeTx(self.walletId, txp, function(err) {
if (err) return cb(err);
self._notify('TxProposalRejectedBy', {
txProposalId: opts.txProposalId,
copayerId: self.copayerId,
});
if (txp.status == 'rejected') {
self._notify('TxProposalFinallyRejected', {
txProposalId: opts.txProposalId,
});
};
return cb();
});
});
@ -642,10 +716,12 @@ CopayServer.prototype.getPendingTxs = function(opts, cb) {
/**
* Retrieves pending transaction proposals in the range (maxTs-minTs)
* Times are in UNIX EPOCH
*
* @param {Object} opts.minTs (defaults to 0)
* @param {Object} opts.maxTs (defaults to now)
* @param {Object} opts.limit
* @returns {TxProposal[]} Transaction proposal.
* @returns {TxProposal[]} Transaction proposals, first newer
*/
CopayServer.prototype.getTxs = function(opts, cb) {
var self = this;
@ -656,6 +732,27 @@ CopayServer.prototype.getTxs = function(opts, cb) {
};
/**
* Retrieves notifications in the range (maxTs-minTs).
* Times are in UNIX EPOCH. Order is assured even for events with the same time
*
* @param {Object} opts.minTs (defaults to 0)
* @param {Object} opts.maxTs (defaults to now)
* @param {Object} opts.limit
* @param {Object} opts.reverse (default false)
* @returns {Notification[]} Notifications
*/
CopayServer.prototype.getNotifications = function(opts, cb) {
var self = this;
self.storage.fetchNotifications(self.walletId, opts, function(err, notifications) {
if (err) return cb(err);
return cb(null, notifications);
});
};
module.exports = CopayServer;
module.exports.ClientError = ClientError;

93
lib/storage.js

@ -11,6 +11,7 @@ var Wallet = require('./model/wallet');
var Copayer = require('./model/copayer');
var Address = require('./model/address');
var TxProposal = require('./model/txproposal');
var Notification = require('./model/notification');
var Storage = function(opts) {
opts = opts || {};
@ -19,6 +20,9 @@ var Storage = function(opts) {
});
};
var zeroPad = function(x, length) {
return (Array(length).join('0') + parseInt(x)).slice(-length);
};
var walletPrefix = function(id) {
return 'w!' + id;
@ -28,9 +32,9 @@ var opKey = function(key) {
return key ? '!' + key : '';
};
var MAX_TS = '999999999999';
var MAX_TS = Array(14).join('9');
var opKeyTs = function(key) {
return key ? '!' + ('000000000000' + key).slice(-12) : '';
return key ? '!' + zeroPad(key, 14) : '';
};
@ -44,6 +48,9 @@ var KEY = {
TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!txp' + opKey(txProposalId);
},
NOTIFICATION: function(walletId, notificationId) {
return walletPrefix(walletId) + '!not' + opKey(notificationId);
},
PENDING_TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId);
},
@ -107,6 +114,19 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
});
};
Storage.prototype.fetchNotification = function(walletId, notificationId, cb) {
this.db.get(KEY.NOTIFICATION(walletId, notificationId), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return cb(null, Notification.fromObj(data));
});
};
Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var txs = [];
var key = KEY.PENDING_TXP(walletId);
@ -127,7 +147,7 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) {
};
/**
* fetchTxs
* fetchTxs. Times are in UNIX EPOCH (seconds)
*
* @param walletId
* @param opts.minTs
@ -138,8 +158,8 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var txs = [];
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? ('000000000000' + parseInt(opts.minTs)).slice(-12) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? ('000000000000' + parseInt(opts.maxTs)).slice(-12) : MAX_TS;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.TXP(walletId, opts.minTs);
var endkey = KEY.TXP(walletId, opts.maxTs);
@ -162,6 +182,49 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) {
});
};
/**
* fetchNotifications
*
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
*/
Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var txs = [];
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs,11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs,11) : MAX_TS;
var key = KEY.NOTIFICATION(walletId, opts.minTs);
var endkey = KEY.NOTIFICATION(walletId, opts.maxTs);
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: opts.reverse,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(Notification.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, txs);
});
};
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb);
};
// TODO should we store only txp.id on keys for indexing
// or the whole txp? For now, the entire record makes sense
// (faster + easier to access)
@ -251,6 +314,9 @@ Storage.prototype._removeCopayers = function(walletId, cb) {
});
};
Storage.prototype._removeAllNotifications = function(walletId, cb) {
this._delByKey(KEY.NOTIFICATION(walletId), cb);
};
Storage.prototype._removeAllAddresses = function(walletId, cb) {
@ -261,24 +327,13 @@ Storage.prototype.removeWallet = function(walletId, cb) {
var self = this;
async.series([
function(next) {
// This should be the first step. Will check the wallet exists
self._removeCopayers(walletId, next);
},
function(next) {
self._removeAllAddresses(walletId, next);
},
function(next) {
self.removeAllPendingTxs(walletId, next);
},
function(next) {
self.removeAllTxs(walletId, next);
},
function(next) {
var ops = [{
type: 'del',
key: KEY.WALLET(walletId),
}];
self.db.batch(ops, next);
self._delByKey(walletPrefix(walletId), cb);
},
], cb);
};

2
package.json

@ -18,7 +18,7 @@
},
"dependencies": {
"async": "^0.9.0",
"bitcore": "*",
"bitcore": "0.10.0",
"bitcore-explorers": "^0.9.1",
"commander": "^2.6.0",
"express": "^4.10.0",

141
test/integration.js

@ -1209,6 +1209,146 @@ describe('Copay server', function() {
});
describe('Notifications', function() {
var server, wallet, copayerPriv;
beforeEach(function(done) {
if (server) return done();
console.log('\tCreating TXS...');
helpers.createAndJoinWallet(1, 1, function(s, w, c) {
server = s;
wallet = w;
copayerPriv = c;
server.createAddress({}, function(err, address) {
helpers.createUtxos(server, wallet, helpers.toSatoshi(_.range(4)), function(utxos) {
helpers.stubBlockExplorer(server, utxos);
var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 0.01, null, copayerPriv[0].privKey);
async.eachSeries(_.range(3), function(i, next) {
server.createTx(txOpts, function(err, tx) {
should.not.exist(err);
next();
});
}, function(err) {
return done(err);
});
});
});
});
});
it('should pull the last 5 notifications after 3 TXs', function(done) {
server.getNotifications({
limit: 5,
reverse: true,
}, function(err, notifications) {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['NewTxProposal', 'NewTxProposal', 'NewTxProposal', 'NewAddress', 'NewAddress']);
done();
});
});
it('should pull the last 5 notifications, using now', function(done) {
server.getNotifications({
limit: 5,
reverse: true,
maxTs: Date.now()/1000,
minTs: Date.now()/1000-1000,
}, function(err, notifications) {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['NewTxProposal', 'NewTxProposal', 'NewTxProposal', 'NewAddress', 'NewAddress']);
done();
});
});
it('should pull the first 5 notifications after wallet creation', function(done) {
server.getNotifications({
minTs: 0,
limit: 5
}, function(err, notifications) {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['NewCopayer', 'NewAddress', 'NewAddress', 'NewAddress', 'NewAddress']);
done();
});
});
it('should notify sign and acceptance', function(done) {
server.getPendingTxs({}, function(err, txs) {
var tx = txs[0];
var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey);
server.signTx({
txProposalId: tx.id,
signatures: signatures,
}, function(err) {
server.getNotifications({
limit: 3,
reverse: true,
}, function(err, notifications) {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['TxProposalFinallyAccepted', 'TxProposalAcceptedBy', 'NewTxProposal']);
done();
});
});
});
});
it('should notify rejection', function(done) {
server.getPendingTxs({}, function(err, txs) {
var tx = txs[1];
server.rejectTx({
txProposalId: tx.id,
}, function(err) {
should.not.exist(err);
server.getNotifications({
limit: 2,
reverse: true,
}, function(err, notifications) {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['TxProposalFinallyRejected', 'TxProposalRejectedBy']);
done();
});
});
});
});
it('should notify sign, acceptance, and broadcast, and emit', function(done) {
server.getPendingTxs({}, function(err, txs) {
var tx = txs[2];
var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey);
helpers.stubBlockExplorer(server, [], '1122334455');
sinon.spy(server, 'emit');
server.signTx({
txProposalId: tx.id,
signatures: signatures,
}, function(err) {
server.getNotifications({
limit: 3,
reverse: true,
}, function(err, notifications) {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['NewOutgoingTx', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']);
// Check also events
server.emit.getCall(0).args[0].type.should.equal('TxProposalAcceptedBy');
server.emit.getCall(1).args[0].type.should.equal('TxProposalFinallyAccepted');;
server.emit.getCall(2).args[0].type.should.equal('NewOutgoingTx');
done();
});
});
});
});
});
describe('#removeWallet', function() {
var server, wallet, clock;
@ -1243,6 +1383,7 @@ describe('Copay server', function() {
server.removeWallet({}, function(err) {
i = 0;
server.storage._dump(function() {
server.storage._dump();
i.should.equal(0);
done();
}, count);

Loading…
Cancel
Save