Browse Source

Merge pull request #744 from braydonf/feature/transport-peer-pool

Transport: Added a peer pool to maintain a set of connected peers. Closes #724
patch-2
Manuel Aráoz 10 years ago
parent
commit
7064a07f99
  1. 1
      index.js
  2. 28
      lib/transport/messages.js
  3. 2
      lib/transport/peer.js
  4. 273
      lib/transport/pool.js
  5. 1
      package.json
  6. 40
      test/transport/peer.js
  7. 98
      test/transport/pool.js

1
index.js

@ -29,6 +29,7 @@ bitcore.util.preconditions = require('./lib/util/preconditions');
bitcore.transport = {}; bitcore.transport = {};
bitcore.transport.Peer = require('./lib/transport/peer'); bitcore.transport.Peer = require('./lib/transport/peer');
bitcore.transport.Messages = require('./lib/transport/messages'); bitcore.transport.Messages = require('./lib/transport/messages');
bitcore.transport.Pool = require('./lib/transport/pool');
// errors thrown by the library // errors thrown by the library
bitcore.errors = require('./lib/errors'); bitcore.errors = require('./lib/errors');

28
lib/transport/messages.js

@ -292,11 +292,31 @@ Addresses.prototype.fromBuffer = function(payload) {
this.addresses = []; this.addresses = [];
for (var i = 0; i < addrCount; i++) { for (var i = 0; i < addrCount; i++) {
// TODO: Time actually depends on the version of the other peer (>=31402) // TODO: Time actually depends on the version of the other peer (>=31402)
var time = parser.readUInt32LE();
var services = parser.readUInt64LEBN();
// parse the ipv6 to a string
var ipv6 = [];
for (var a = 0; a < 6; a++) {
ipv6.push(parser.read(2).toString('hex'));
}
ipv6 = ipv6.join(':');
// parse the ipv4 to a string
var ipv4 = [];
for (var b = 0; b < 4; b++) {
ipv4.push(parser.read(1)[0]);
}
ipv4 = ipv4.join('.');
var port = parser.readUInt16BE();
this.addresses.push({ this.addresses.push({
time: parser.readUInt32LE(), time: time,
services: parser.readUInt64LEBN(), services: services,
ip: parser.read(16), ip: { v6: ipv6, v4: ipv4 },
port: parser.readUInt16BE() port: port
}); });
} }

2
lib/transport/peer.js

@ -65,7 +65,7 @@ function Peer(host, port, network) {
}); });
this.on('ping', function(message) { this.on('ping', function(message) {
self.sendPong(message.nonce); self._sendPong(message.nonce);
}); });
} }

273
lib/transport/pool.js

@ -0,0 +1,273 @@
'use strict';
var dns = require('dns');
var EventEmitter = require('events').EventEmitter;
var Networks = require('../networks');
var sha256 = require('../crypto/hash').sha256;
var Peer = require('./peer');
var util = require('util');
function now() {
return Math.floor(new Date().getTime() / 1000);
}
/**
* A pool is a collection of Peers. A pool will discover peers from DNS seeds, and
* collect information about new peers in the network. When a peer disconnects the pool
* will connect to others that are available to maintain a max number of
* ongoing peer connections. Peer events are relayed to the pool.
*
* @example
*
* var pool = new Pool(Networks.livenet);
* pool.on('peerinv', function(peer, message) {
* // do something with the inventory announcement
* });
* pool.connect();
*
* @param {Network|String} network - The network to connect
* @returns {Pool}
* @constructor
*/
function Pool(network) {
var self = this;
this.network = Networks.get(network) || Networks.defaultNetwork;
this.keepalive = false;
this._connectedPeers = {};
this._addrs = [];
this.on('peeraddr', function peerAddrEvent(peer, message) {
var addrs = message.addresses;
var length = addrs.length;
for (var i = 0; i < length; i++) {
var addr = addrs[i];
// In case of an invalid time, assume "5 days ago"
if (addr.time <= 100000000 || addr.time > (now() + 10 * 60)) {
addr.time = now() - 5 * 24 * 60 * 60;
}
this._addAddr(addr);
}
});
this.on('seed', function seedEvent(ips) {
ips.forEach(function(ip) {
self._addAddr({
ip: {
v4: ip
}
});
});
if (self.keepalive) {
self._fillConnections();
}
});
this.on('peerdisconnect', function peerDisconnectEvent(peer, addr) {
self._deprioritizeAddr(addr);
self._removeConnectedPeer(addr);
if (self.keepalive) {
self._fillConnections();
}
});
return this;
}
util.inherits(Pool, EventEmitter);
Pool.MaxConnectedPeers = 8;
Pool.RetrySeconds = 30;
Pool.PeerEvents = ['version', 'inv', 'getdata', 'ping', 'ping', 'addr',
'getaddr', 'verack', 'reject', 'alert', 'headers', 'block',
'tx', 'getblocks', 'getheaders'
];
/**
* Will initiatiate connection to peers, if available peers have been added to
* the pool, it will connect to those, otherwise will use DNS seeds to find
* peers to connect. When a peer disconnects it will add another.
*/
Pool.prototype.connect = function connect() {
this.keepalive = true;
var self = this;
if (self._addrs.length === 0) {
self._addAddrsFromSeeds();
} else {
self._fillConnections();
}
return this;
};
/**
* Will disconnect all peers that are connected.
*/
Pool.prototype.disconnect = function disconnect() {
this.keepalive = false;
for (var i in this._connectedPeers) {
this._connectedPeers[i].disconnect();
}
return this;
};
/**
* @returns {Number} The number of peers currently connected.
*/
Pool.prototype.numberConnected = function numberConnected() {
return Object.keys(this._connectedPeers).length;
};
/**
* Will fill the conneted peers to the maximum amount.
*/
Pool.prototype._fillConnections = function _fillConnections() {
var length = this._addrs.length;
for (var i = 0; i < length; i++) {
if (this.numberConnected() >= Pool.MaxConnectedPeers) {
break;
}
var addr = this._addrs[i];
if (!addr.retryTime || now() > addr.retryTime) {
this._connectPeer(addr);
}
}
return this;
};
/**
* Will remove a peer from the list of connected peers.
* @param {Object} addr - An addr from the list of addrs
*/
Pool.prototype._removeConnectedPeer = function _removeConnectedPeer(addr) {
if (this._connectedPeers[addr.hash].status !== Peer.STATUS.DISCONNECTED) {
this._connectedPeers[addr.hash].disconnect();
} else {
delete this._connectedPeers[addr.hash];
}
return this;
};
/**
* Will connect a peer and add to the list of connected peers.
* @param {Object} addr - An addr from the list of addrs
*/
Pool.prototype._connectPeer = function _connectPeer(addr) {
var self = this;
function addConnectedPeer(addr) {
var port = addr.port || self.network.port;
var ip = addr.ip.v4 || addr.ip.v6;
var peer = new Peer(ip, port, self.network);
peer.on('disconnect', function peerDisconnect() {
self.emit('peerdisconnect', peer, addr);
});
peer.on('ready', function peerReady() {
self.emit('peerready', peer, addr);
});
Pool.PeerEvents.forEach(function addPeerEvents(event) {
peer.on(event, function peerEvent(message) {
self.emit('peer' + event, peer, message);
});
});
peer.connect();
self._connectedPeers[addr.hash] = peer;
}
if (!this._connectedPeers[addr.hash]) {
addConnectedPeer(addr);
}
return this;
};
/**
* Will deprioritize an addr in the list of addrs by moving it to the end
* of the array, and setting a retryTime
* @param {Object} addr - An addr from the list of addrs
*/
Pool.prototype._deprioritizeAddr = function _deprioritizeAddr(addr) {
for (var i = 0; i < this._addrs.length; i++) {
if (this._addrs[i].hash === addr.hash) {
var middle = this._addrs[i];
middle.retryTime = now() + Pool.RetrySeconds;
var beginning = this._addrs.splice(0, i);
var end = this._addrs.splice(i + 1, this._addrs.length);
var combined = beginning.concat(end);
this._addrs = combined.concat([middle]);
}
}
return this;
};
/**
* Will add an addr to the beginning of the addrs array
* @param {Object}
*/
Pool.prototype._addAddr = function _addAddr(addr) {
// make a unique key
addr.hash = sha256(new Buffer(addr.ip.v6 + addr.ip.v4 + addr.port)).toString('hex');
var length = this._addrs.length;
var exists = false;
for (var i = 0; i < length; i++) {
if (this._addrs[i].hash === addr.hash) {
exists = true;
}
}
if (!exists) {
this._addrs.unshift(addr);
}
return this;
};
/**
* Will add addrs to the list of addrs from a DNS seed
* @param {String} seed - A domain name to resolve known peers
* @param {Function} done
*/
Pool.prototype._addAddrsFromSeed = function _addAddrsFromSeed(seed) {
var self = this;
dns.resolve(seed, function(err, ips) {
if (err) {
self.emit('seederror', err);
return;
}
if (!ips || !ips.length) {
self.emit('seederror', new Error('No IPs found from seed lookup.'));
return;
}
// announce to pool
self.emit('seed', ips);
});
return this;
};
/**
* Will add addrs to the list of addrs from network DNS seeds
* @param {Function} done
*/
Pool.prototype._addAddrsFromSeeds = function _addAddrsFromSeeds() {
var self = this;
var seeds = this.network.dnsSeeds;
seeds.forEach(function(seed) {
self._addAddrsFromSeed(seed);
});
return this;
};
/**
* @returns {String} A string formatted for the console
*/
Pool.prototype.inspect = function inspect() {
return '<Pool network: ' +
this.network + ', connected: ' +
this.numberConnected() + ', available: ' +
this._addrs.length + '>';
};
module.exports = Pool;

1
package.json

@ -98,6 +98,7 @@
"gulp-shell": "^0.2.10", "gulp-shell": "^0.2.10",
"mocha": "~2.0.1", "mocha": "~2.0.1",
"run-sequence": "^1.0.2", "run-sequence": "^1.0.2",
"sinon": "^1.12.2",
"karma": "^0.12.28", "karma": "^0.12.28",
"karma-firefox-launcher": "^0.1.3", "karma-firefox-launcher": "^0.1.3",
"karma-mocha": "^0.1.9" "karma-mocha": "^0.1.9"

40
test/transport/peer.js

@ -49,21 +49,27 @@ describe('Peer', function() {
peer.port.should.equal(8111); peer.port.should.equal(8111);
}); });
// only for node TODO: (yemel) if (typeof(window) === 'undefined'){
it.skip('should be able to set a proxy', function() {
var peer, peer2, socket; // Node.js Tests
peer = new Peer('localhost'); it('should be able to set a proxy', function() {
expect(peer.proxy).to.be.undefined(); var peer, peer2, socket;
socket = peer._getSocket();
socket.should.be.instanceof(Net.Socket); peer = new Peer('localhost');
expect(peer.proxy).to.be.undefined();
peer2 = peer.setProxy('127.0.0.1', 9050); socket = peer._getSocket();
peer2.proxy.host.should.equal('127.0.0.1'); socket.should.be.instanceof(Net.Socket);
peer2.proxy.port.should.equal(9050);
socket = peer2._getSocket(); peer2 = peer.setProxy('127.0.0.1', 9050);
socket.should.be.instanceof(Socks5Client); peer2.proxy.host.should.equal('127.0.0.1');
peer2.proxy.port.should.equal(9050);
peer.should.equal(peer2); socket = peer2._getSocket();
}); socket.should.be.instanceof(Socks5Client);
peer.should.equal(peer2);
});
}
}); });

98
test/transport/pool.js

@ -0,0 +1,98 @@
'use strict';
if (typeof(window) === 'undefined'){
// Node.js Tests
var chai = require('chai');
/* jshint unused: false */
var should = chai.should();
var expect = chai.expect;
var dns = require('dns');
var sinon = require('sinon');
var bitcore = require('../..');
var Peer = bitcore.transport.Peer;
var MessagesData = require('../data/messages');
var Messages = bitcore.transport.Messages;
var Pool = bitcore.transport.Pool;
var Networks = bitcore.Networks;
describe('Pool', function() {
it('should be able to create instance', function() {
var pool = new Pool();
pool.network.should.equal(Networks.livenet);
});
it('should be able to create instance setting the network', function() {
var pool = new Peer(Networks.testnet);
pool.network.should.equal(Networks.livenet);
});
it('should discover peers via dns', function() {
var stub = sinon.stub(dns, 'resolve', function(seed, callback){
callback(null, ['10.10.10.1', '10.10.10.2', '10.10.10.3']);
});
var pool = new Pool(Networks.livenet);
pool.connect();
pool.disconnect();
pool._addrs.length.should.equal(3);
stub.restore();
});
it('should not discover peers via dns', function() {
var pool = new Pool();
pool._addAddr({ip: {v4: '10.10.10.1'}});
pool.connect();
pool.disconnect();
pool._addrs.length.should.equal(1);
});
it('should add new addrs as they are announced over the network', function(done) {
// only emit an event, no need to connect
var peerConnectStub = sinon.stub(Peer.prototype, 'connect', function(){
this._readMessage();
this.emit('ready');
});
// mock a addr peer event
var peerMessageStub = sinon.stub(Peer.prototype, '_readMessage', function(){
var payload = new Buffer(MessagesData.ADDR.payload, 'hex');
var message = new Messages.Addresses().fromBuffer(payload);
this.emit(message.command, message);
});
var pool = new Pool();
pool._addAddr({ip: {v4: 'localhost'}});
// listen for the event
pool.on('peeraddr', function(peer, message) {
pool._addrs.length.should.equal(502);
// restore stubs
peerConnectStub.restore();
peerMessageStub.restore();
for (var i = 0; i < pool._addrs.length; i++) {
should.exist(pool._addrs[i].hash);
should.exist(pool._addrs[i].ip);
should.exist(pool._addrs[i].ip.v4);
}
// done
done();
});
pool.connect();
});
});
}
Loading…
Cancel
Save