Browse Source

Merge pull request #158 from isocolsky/incoming_tx

Incoming tx
activeAddress
Matias Alejo Garcia 10 years ago
parent
commit
fe01b8e5a6
  1. 4
      README.md
  2. 57
      lib/blockchainexplorer.js
  3. 100
      lib/blockchainmonitor.js
  4. 25
      lib/eventbroadcaster.js
  5. 4
      lib/model/notification.js
  6. 25
      lib/notificationbroadcaster.js
  7. 68
      lib/server.js
  8. 44
      lib/wsapp.js
  9. 5
      package.json
  10. 35
      test/blockchainexplorer.js
  11. 123
      test/integration/server.js

4
README.md

@ -52,7 +52,7 @@ Optional Arguments:
* limit: Total number of records to return (return all available records if not specified).
Returns:
* History of incomming and outgoing transactions of the wallet. The list is paginated using the `skip` & `limit` params. Each item has the following fields:
* History of incoming and outgoing transactions of the wallet. The list is paginated using the `skip` & `limit` params. Each item has the following fields:
* action ('sent', 'received', 'moved')
* amount
* fees
@ -72,7 +72,7 @@ Returns:
`/v1/addresses/`: Get Wallet's main addresses (does not include change addresses)
Returns:
* List of Addresses object: (https://github.com/bitpay/bitcore-wallet-service/blob/master/lib/model/adddress.js)). This call is mainly provided so the client check this addresses for incomming transactions (using a service like [Insight](https://insight.is)
* List of Addresses object: (https://github.com/bitpay/bitcore-wallet-service/blob/master/lib/model/adddress.js)). This call is mainly provided so the client check this addresses for incoming transactions (using a service like [Insight](https://insight.is)
`/v1/balance/`: Get Wallet's balance

57
lib/blockchainexplorer.js

@ -0,0 +1,57 @@
'use strict';
var _ = require('lodash');
var $ = require('preconditions').singleton();
var log = require('npmlog');
log.debug = log.verbose;
var Explorers = require('bitcore-explorers');
var request = require('request');
var io = require('socket.io-client');
function BlockChainExplorer(opts) {
$.checkArgument(opts);
var provider = opts.provider || 'insight';
var network = opts.network || 'livenet';
var url;
switch (provider) {
case 'insight':
switch (network) {
default:
case 'livenet':
url = 'https://insight.bitpay.com:443';
break;
case 'testnet':
url = 'https://test-insight.bitpay.com:443'
break;
}
var explorer = new Explorers.Insight(url, network);
explorer.getTransactions = _.bind(getTransactionsInsight, explorer, url);
explorer.initSocket = _.bind(initSocketInsight, explorer, url);
return explorer;
default:
throw new Error('Provider ' + provider + ' not supported');
};
};
function getTransactionsInsight(url, addresses, cb) {
request({
method: "POST",
url: url + '/api/addrs/txs',
json: {
addrs: [].concat(addresses).join(',')
}
}, function(err, res, body) {
if (err || res.statusCode != 200) return cb(err || res);
return cb(null, body);
});
};
function initSocketInsight(url) {
var socket = io.connect(url, {});
return socket;
};
module.exports = BlockChainExplorer;

100
lib/blockchainmonitor.js

@ -0,0 +1,100 @@
'use strict';
var $ = require('preconditions').singleton();
var _ = require('lodash');
var async = require('async');
var log = require('npmlog');
log.debug = log.verbose;
var Uuid = require('uuid');
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
var WalletService = require('./server');
var BlockchainExplorer = require('./blockchainexplorer');
var Notification = require('./model/notification');
function BlockchainMonitor() {
var self = this;
this.subscriptions = {};
this.subscriber = {};
this.subscriber['livenet'] = self._getAddressSubscriber('insight', 'livenet');
this.subscriber['testnet'] = self._getAddressSubscriber('insight', 'testnet');
};
nodeutil.inherits(BlockchainMonitor, events.EventEmitter);
BlockchainMonitor.prototype._getAddressSubscriber = function(provider, network) {
$.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported');
var explorer = new BlockchainExplorer({
provider: provider,
network: network,
});
var socket = explorer.initSocket();
// TODO: Extract on its own class once more providers are implemented
return {
subscribe: function(address, handler) {
socket.emit('subscribe', address);
socket.on(address, handler);
},
};
};
BlockchainMonitor.prototype.subscribeAddresses = function(walletId, addresses) {
$.checkArgument(walletId);
var self = this;
if (!addresses || addresses.length == 0) return;
function handlerFor(address, txid) {
var notification = Notification.create({
walletId: this,
type: 'NewIncomingTx',
data: {
address: address,
txid: txid,
},
});
self.emit('notification', notification);
};
if (!self.subscriptions[walletId]) {
self.subscriptions[walletId] = {
addresses: [],
};
};
var addresses = [].concat(addresses);
var network = Bitcore.Address.fromString(addresses[0]).network.name;
var subscriber = self.subscriber[network];
_.each(addresses, function(address) {
self.subscriptions[walletId].addresses.push(address);
subscriber.subscribe(address, _.bind(handlerFor, walletId, address));
});
};
BlockchainMonitor.prototype.subscribeWallet = function(walletService, cb) {
var self = this;
var walletId = walletService.walletId;
if (self.subscriptions[walletId]) return;
walletService.getMainAddresses({}, function(err, addresses) {
if (err) {
delete self.subscriptions[walletId];
return cb(new Error('Could not subscribe to addresses for wallet ' + walletId));
}
self.subscribeAddresses(walletService.walletId, _.pluck(addresses, 'address'));
return cb();
});
};
module.exports = BlockchainMonitor;

25
lib/eventbroadcaster.js

@ -1,25 +0,0 @@
'use strict';
var log = require('npmlog');
log.debug = log.verbose;
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
function EventBroadcaster() {};
nodeutil.inherits(EventBroadcaster, events.EventEmitter);
EventBroadcaster.prototype.broadcast = function(eventName, serviceInstance, args) {
this.emit(eventName, serviceInstance, args);
};
var _eventBroadcasterInstance;
EventBroadcaster.singleton = function() {
if (!_eventBroadcasterInstance) {
_eventBroadcasterInstance = new EventBroadcaster();
}
return _eventBroadcasterInstance;
};
module.exports = EventBroadcaster.singleton();

4
lib/model/notification.js

@ -12,8 +12,8 @@ var Uuid = require('uuid');
* txProposalFinallyRejected - txProposalId
* txProposalFinallyAccepted - txProposalId
*
* newIncommingTx (amount)
* newOutgoingTx - (txProposalId, txid)
* NewIncomingTx (address, txid)
* NewOutgoingTx - (txProposalId, txid)
*
* data Examples:
* { amount: 'xxx', address: 'xxx'}

25
lib/notificationbroadcaster.js

@ -0,0 +1,25 @@
'use strict';
var log = require('npmlog');
log.debug = log.verbose;
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
function NotificationBroadcaster() {};
nodeutil.inherits(NotificationBroadcaster, events.EventEmitter);
NotificationBroadcaster.prototype.broadcast = function(eventName, notification) {
this.emit(eventName, notification);
};
var _instance;
NotificationBroadcaster.singleton = function() {
if (!_instance) {
_instance = new NotificationBroadcaster();
}
return _instance;
};
module.exports = NotificationBroadcaster.singleton();

68
lib/server.js

@ -10,12 +10,12 @@ var Bitcore = WalletUtils.Bitcore;
var PublicKey = Bitcore.PublicKey;
var HDPublicKey = Bitcore.HDPublicKey;
var Address = Bitcore.Address;
var Explorers = require('bitcore-explorers');
var ClientError = require('./clienterror');
var Utils = require('./utils');
var Storage = require('./storage');
var EventBroadcaster = require('./eventbroadcaster');
var NotificationBroadcaster = require('./notificationbroadcaster');
var BlockchainExplorer = require('./blockchainexplorer');
var Wallet = require('./model/wallet');
var Copayer = require('./model/copayer');
@ -24,7 +24,7 @@ var TxProposal = require('./model/txproposal');
var Notification = require('./model/notification');
var initialized = false;
var storage, blockExplorer;
var storage, blockchainExplorer;
/**
@ -36,24 +36,24 @@ function WalletService() {
throw new Error('Server not initialized');
this.storage = storage;
this.blockExplorer = blockExplorer;
this.blockchainExplorer = blockchainExplorer;
this.notifyTicker = 0;
};
WalletService.onNotification = function(func) {
EventBroadcaster.on('notification', func);
NotificationBroadcaster.on('notification', func);
};
/**
* Initializes global settings for all instances.
* @param {Object} opts
* @param {Storage} [opts.storage] - The storage provider.
* @param {Storage} [opts.blockExplorer] - The blockExporer provider.
* @param {Storage} [opts.blockchainExplorer] - The blockchainExporer provider.
*/
WalletService.initialize = function(opts) {
opts = opts || {};
storage = opts.storage ||  new Storage();
blockExplorer = opts.blockExplorer;
blockchainExplorer = opts.blockchainExplorer;
initialized = true;
};
@ -167,7 +167,7 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) {
* @param {Object} args
*/
WalletService.prototype._emit = function(eventName, args) {
EventBroadcaster.broadcast(eventName, this, args);
NotificationBroadcaster.broadcast(eventName, args);
};
/**
@ -284,7 +284,9 @@ WalletService.prototype.createAddress = function(opts, cb) {
self.storage.storeAddressAndWallet(wallet, address, function(err) {
if (err) return cb(err);
self._notify('NewAddress');
self._notify('NewAddress', {
address: address.address,
});
return cb(null, address);
});
});
@ -333,43 +335,15 @@ WalletService.prototype.verifyMessageSignature = function(opts, cb) {
};
WalletService.prototype._getBlockExplorer = function(provider, network) {
var url;
function getTransactionsInsight(url, addresses, cb) {
var request = require('request');
request({
method: "POST",
url: url + '/api/addrs/txs',
json: {
addrs: [].concat(addresses).join(',')
}
}, function(err, res, body) {
if (err || res.statusCode != 200) return cb(err || res);
return cb(null, body);
WalletService.prototype._getBlockchainExplorer = function(provider, network) {
if (!this.blockchainExplorer) {
this.blockchainExplorer = new BlockchainExplorer({
provider: provider,
network: network,
});
};
if (this.blockExplorer)
return this.blockExplorer;
switch (provider) {
default: ;
case 'insight':
switch (network) {
default:
case 'livenet':
url = 'https://insight.bitpay.com:443';
break;
case 'testnet':
url = 'https://test-insight.bitpay.com:443'
break;
}
var bc = new Explorers.Insight(url, network);
bc.getTransactions = _.bind(getTransactionsInsight, bc, url);
return bc;
break;
}
return this.blockchainExplorer;
};
/**
@ -389,7 +363,7 @@ WalletService.prototype._getUtxos = function(cb) {
var addressToPath = _.indexBy(addresses, 'address'); // TODO : check performance
var networkName = Bitcore.Address(addressStrs[0]).toObject().network;
var bc = self._getBlockExplorer('insight', networkName);
var bc = self._getBlockchainExplorer('insight', networkName);
bc.getUnspentUtxos(addressStrs, function(err, inutxos) {
if (err) return cb(err);
var utxos = _.map(inutxos, function(i) {
@ -695,7 +669,7 @@ WalletService.prototype._broadcastTx = function(txp, cb) {
} catch (ex) {
return cb(ex);
}
var bc = this._getBlockExplorer('insight', txp.getNetworkName());
var bc = this._getBlockchainExplorer('insight', txp.getNetworkName());
bc.broadcast(raw, function(err, txid) {
return cb(err, txid);
})
@ -1040,7 +1014,7 @@ WalletService.prototype.getTxHistory = function(opts, cb) {
var addressStrs = _.pluck(addresses, 'address');
var networkName = Bitcore.Address(addressStrs[0]).toObject().network;
var bc = self._getBlockExplorer('insight', networkName);
var bc = self._getBlockchainExplorer('insight', networkName);
async.parallel([
function(next) {

44
lib/wsapp.js

@ -4,17 +4,19 @@ var $ = require('preconditions').singleton();
var _ = require('lodash');
var async = require('async');
var log = require('npmlog');
var express = require('express');
var querystring = require('querystring');
var bodyParser = require('body-parser')
log.debug = log.verbose;
var Uuid = require('uuid');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
var WalletService = require('./server');
var BlockchainMonitor = require('./blockchainmonitor')
var Notification = require('./model/notification');
log.debug = log.verbose;
log.level = 'debug';
var subscriptions = {};
var io, bcMonitor;
var WsApp = function() {};
@ -23,17 +25,27 @@ WsApp._unauthorized = function() {
socket.disconnect();
};
WsApp.handleNotification = function(service, notification) {
if (notification.type == 'NewAddress') {
self.subscribeAddresses(notification.walletId, notification.data.address);
}
io.to(notification.walletId).emit('notification', notification);
};
WsApp.start = function(server) {
var self = this;
io = require('socket.io')(server);
var io = require('socket.io')(server);
bcMonitor = new BlockchainMonitor();
WalletService.onNotification(function(serviceInstance, args) {
var room = serviceInstance.walletId || args.walletId;
if (room) {
io.to(room).emit('notification', args);
function handleNotification(notification) {
if (notification.type == 'NewAddress') {
bcMonitor.subscribeAddresses(notification.walletId, notification.data.address);
}
});
io.to(notification.walletId).emit('notification', notification);
};
bcMonitor.on('notification', handleNotification);
WalletService.onNotification(handleNotification);
io.on('connection', function(socket) {
socket.nonce = Uuid.v4();
@ -42,11 +54,15 @@ WsApp.start = function(server) {
socket.on('authorize', function(data) {
if (data.message != socket.nonce) return WsApp.unauthorized();
WalletService.getInstanceWithAuth(data, function(err, res) {
WalletService.getInstanceWithAuth(data, function(err, service) {
if (err) return WsApp.unauthorized();
socket.join(res.walletId);
socket.join(service.walletId);
socket.emit('authorized');
bcMonitor.subscribeWallet(service, function(err) {
if (err) log.warn(err.message);
});
});
});
});

5
package.json

@ -2,7 +2,7 @@
"name": "bitcore-wallet-service",
"description": "A service for Mutisig HD Bitcoin Wallets",
"author": "BitPay Inc",
"version": "0.0.16",
"version": "0.0.17",
"keywords": [
"bitcoin",
"copay",
@ -19,8 +19,8 @@
},
"dependencies": {
"async": "^0.9.0",
"bitcore-wallet-utils": "^0.0.7",
"bitcore-explorers": "^0.9.1",
"bitcore-wallet-utils": "^0.0.7",
"body-parser": "^1.11.0",
"coveralls": "^2.11.2",
"express": "^4.10.0",
@ -36,6 +36,7 @@
"request": "^2.53.0",
"sjcl": "^1.0.2",
"socket.io": "^1.3.5",
"socket.io-client": "^1.3.5",
"uuid": "*"
},
"devDependencies": {

35
test/blockchainexplorer.js

@ -0,0 +1,35 @@
'use strict';
var _ = require('lodash');
var chai = require('chai');
var sinon = require('sinon');
var should = chai.should();
var BlockchainExplorer = require('../lib/blockchainexplorer');
describe('Blockchain explorer', function() {
describe('#constructor', function() {
it('should return a blockchain explorer with basic methods', function() {
var exp = BlockchainExplorer({
provider: 'insight',
network: 'testnet',
});
should.exist(exp);
exp.should.respondTo('broadcast');
exp.should.respondTo('getTransactions');
exp.should.respondTo('getUnspentUtxos');
exp.should.respondTo('initSocket');
var exp = BlockchainExplorer({
provider: 'insight',
network: 'livenet',
});
should.exist(exp);
});
it('should fail on unsupported provider', function() {
(function() {
var exp = BlockchainExplorer({
provider: 'dummy',
});
}).should.throw('not supported');
});
});
});

123
test/integration/server.js

@ -16,6 +16,7 @@ var Utils = require('../../lib/utils');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
var Storage = require('../../lib/storage');
var BlockchainMonitor = require('../../lib/blockchainmonitor');
var Wallet = require('../../lib/model/wallet');
var TxProposal = require('../../lib/model/txproposal');
@ -150,22 +151,22 @@ helpers.stubUtxos = function(server, wallet, amounts, cb) {
};
return obj;
});
blockExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, utxos);
blockchainExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, utxos);
return cb(utxos);
});
};
helpers.stubBroadcast = function(txid) {
blockExplorer.broadcast = sinon.stub().callsArgWith(1, null, txid);
blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, null, txid);
};
helpers.stubBroadcastFail = function() {
blockExplorer.broadcast = sinon.stub().callsArgWith(1, 'broadcast error');
blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, 'broadcast error');
};
helpers.stubHistory = function(txs) {
blockExplorer.getTransactions = sinon.stub().callsArgWith(1, null, txs);
blockchainExplorer.getTransactions = sinon.stub().callsArgWith(1, null, txs);
};
helpers.clientSign = WalletUtils.signTxp;
@ -198,10 +199,10 @@ helpers.createAddresses = function(server, wallet, main, change, cb) {
});
};
var db, storage, blockExplorer;
var db, storage, blockchainExplorer;
describe('Copay server', function() {
describe('Wallet service', function() {
beforeEach(function() {
db = levelup(memdown, {
valueEncoding: 'json'
@ -209,11 +210,11 @@ describe('Copay server', function() {
storage = new Storage({
db: db
});
blockExplorer = sinon.stub();
blockchainExplorer = sinon.stub();
WalletService.initialize({
storage: storage,
blockExplorer: blockExplorer,
blockchainExplorer: blockchainExplorer,
});
helpers.offset = 0;
});
@ -608,7 +609,15 @@ describe('Copay server', function() {
address.address.should.equal('3KxttbKQQPWmpsnXZ3rB4mgJTuLnVR7frg');
address.isChange.should.be.false;
address.path.should.equal('m/2147483647/0/0');
done();
server.getNotifications({}, function(err, notifications) {
should.not.exist(err);
var notif = _.find(notifications, {
type: 'NewAddress'
});
should.exist(notif);
notif.data.address.should.equal('3KxttbKQQPWmpsnXZ3rB4mgJTuLnVR7frg');
done();
});
});
});
@ -691,7 +700,7 @@ describe('Copay server', function() {
});
});
it('should get balance when there are no funds', function(done) {
blockExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, []);
blockchainExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, []);
server.createAddress({}, function(err, address) {
should.not.exist(err);
server.getBalance({}, function(err, balance) {
@ -2465,3 +2474,97 @@ describe('Copay server', function() {
});
});
});
describe('Blockchain monitor', function() {
var addressSubscriber;
beforeEach(function() {
db = levelup(memdown, {
valueEncoding: 'json'
});
storage = new Storage({
db: db
});
blockchainExplorer = sinon.stub();
WalletService.initialize({
storage: storage,
blockchainExplorer: blockchainExplorer,
});
helpers.offset = 0;
addressSubscriber = sinon.stub();
addressSubscriber.subscribe = sinon.stub();
sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber);
});
afterEach(function() {
BlockchainMonitor.prototype._getAddressSubscriber.restore();
});
it('should subscribe wallet', function(done) {
var monitor = new BlockchainMonitor();
helpers.createAndJoinWallet(2, 2, function(server, wallet) {
server.createAddress({}, function(err, address1) {
should.not.exist(err);
server.createAddress({}, function(err, address2) {
should.not.exist(err);
monitor.subscribeWallet(server, function(err) {
should.not.exist(err);
addressSubscriber.subscribe.calledTwice.should.be.true;
addressSubscriber.subscribe.calledWith(address1.address).should.be.true;
addressSubscriber.subscribe.calledWith(address2.address).should.be.true;
done();
});
});
});
});
});
it('should be able to subscribe new address', function(done) {
var monitor = new BlockchainMonitor();
helpers.createAndJoinWallet(2, 2, function(server, wallet) {
server.createAddress({}, function(err, address1) {
should.not.exist(err);
monitor.subscribeWallet(server, function(err) {
should.not.exist(err);
addressSubscriber.subscribe.calledOnce.should.be.true;
addressSubscriber.subscribe.calledWith(address1.address).should.be.true;
server.createAddress({}, function(err, address2) {
should.not.exist(err);
monitor.subscribeAddresses(wallet.id, address2.address);
addressSubscriber.subscribe.calledTwice.should.be.true;
addressSubscriber.subscribe.calledWith(address2.address).should.be.true;
done();
});
});
});
});
});
it('should create NewIncomingTx notification when a new tx arrives on registered address', function(done) {
var monitor = new BlockchainMonitor();
helpers.createAndJoinWallet(2, 2, function(server, wallet) {
server.createAddress({}, function(err, address1) {
should.not.exist(err);
monitor.subscribeWallet(server, function(err) {
should.not.exist(err);
addressSubscriber.subscribe.calledOnce.should.be.true;
addressSubscriber.subscribe.getCall(0).args[0].should.equal(address1.address);
var handler = addressSubscriber.subscribe.getCall(0).args[1];
_.isFunction(handler).should.be.true;
monitor.on('notification', function(notification) {
notification.type.should.equal('NewIncomingTx');
notification.data.address.should.equal(address1.address);
notification.data.txid.should.equal('txid');
done();
});
handler('txid');
});
});
});
});
});

Loading…
Cancel
Save