Browse Source

Add transport/connection class

patch-2
Yemel Jardi 10 years ago
parent
commit
f1d2009418
  1. 4
      index.js
  2. 575
      lib/transport/connection.js
  3. 6
      lib/util/buffer.js
  4. 41
      test/transport/connection.js

4
index.js

@ -24,6 +24,10 @@ bitcore.util.bitcoin = require('./lib/util/bitcoin');
bitcore.util.buffer = require('./lib/util/buffer');
bitcore.util.js = require('./lib/util/js');
// transport
bitcore.transport = {};
bitcore.transport.Connection = require('./lib/transport/connection');
// errors thrown by the library
bitcore.errors = require('./lib/errors');

575
lib/transport/connection.js

@ -0,0 +1,575 @@
'use strict';
var MAX_RECEIVE_BUFFER = 10000000;
var PROTOCOL_VERSION = 70000;
var Util = require('util');
var Put = require('bufferput');
var Buffers = require('buffers');
var buffertools = require('buffertools');
// PATCH TODO: Remove (yemel)
Buffers.prototype.skip = function (i) {
if (i == 0) {
return;
} else if (i == this.length) {
this.buffers = [];
this.length = 0;
return;
}
var pos = this.pos(i);
this.buffers = this.buffers.slice(pos.buf);
this.buffers[0] = new Buffer(this.buffers[0].slice(pos.offset));
this.length -= i;
};
var networks = require('../networks');
var Block = require('../block');
var Transaction = require('../transaction');
var BufferUtil = require('../util/buffer');
var BufferReader = require('../encoding/bufferreader');
var Hash = require('../crypto/hash');
var Random = require('../crypto/random');
var nonce = Random.getPseudoRandomBuffer(8);
var EventEmitter = require('events').EventEmitter;
var BIP0031_VERSION = 60000;
function Connection(socket, peer, opts) {
this.config = opts || {};
this.network = this.config.network || networks.livenet;
this.socket = socket;
this.peer = peer;
// check for socks5 proxy options and construct a proxied socket
if (this.config.proxy) {
var Socks5Client = require('socks5-client');
this.socket = new Socks5Client(this.config.proxy.host, this.config.proxy.port);
}
// A connection is considered "active" once we have received verack
this.active = false;
// The version incoming packages are interpreted as
this.recvVer = 0;
// The version outgoing packages are sent as
this.sendVer = 0;
// The (claimed) height of the remote peer's block chain
this.bestHeight = 0;
// Is this an inbound connection?
this.inbound = !!this.socket.server;
// Have we sent a getaddr on this connection?
this.getaddr = false;
// Receive buffer
this.buffers = new Buffers();
// Starting 20 Feb 2012, Version 0.2 is obsolete
// This is the same behavior as the official client
if (new Date().getTime() > 1329696000000) {
this.recvVer = 209;
this.sendVer = 209;
}
this.setupHandlers();
}
Util.inherits(Connection, EventEmitter);
Connection.prototype.open = function(callback) {
if (typeof callback === 'function') this.once('connect', callback);
this.socket.connect(this.peer.port, this.peer.host);
return this;
};
Connection.prototype.setupHandlers = function() {
this.socket.addListener('connect', this.handleConnect.bind(this));
this.socket.addListener('error', this.handleError.bind(this));
this.socket.addListener('end', this.handleDisconnect.bind(this));
this.socket.addListener('data', (function(data) {
var dumpLen = 35;
console.debug('[' + this.peer + '] ' +
'Recieved ' + data.length + ' bytes of data:');
console.debug('... ' + buffertools.toHex(data.slice(0, dumpLen > data.length ?
data.length : dumpLen)) +
(data.length > dumpLen ? '...' : ''));
}).bind(this));
this.socket.addListener('data', this.handleData.bind(this));
};
Connection.prototype.handleConnect = function() {
if (!this.inbound) {
this.sendVersion();
}
this.emit('connect', {
conn: this,
socket: this.socket,
peer: this.peer
});
};
Connection.prototype.handleError = function(err) {
if (err.errno == 110 || err.errno == 'ETIMEDOUT') {
console.info('connection timed out for ' + this.peer);
} else if (err.errno == 111 || err.errno == 'ECONNREFUSED') {
console.info('connection refused for ' + this.peer);
} else {
console.warn('connection with ' + this.peer + ' ' + err.toString());
}
this.emit('error', {
conn: this,
socket: this.socket,
peer: this.peer,
err: err
});
};
Connection.prototype.handleDisconnect = function() {
this.emit('disconnect', {
conn: this,
socket: this.socket,
peer: this.peer
});
};
Connection.prototype.handleMessage = function(message) {
if (!message) {
// Parser was unable to make sense of the message, drop it
return;
}
try {
switch (message.command) {
case 'version':
// Did we connect to ourself?
if (buffertools.compare(nonce, message.nonce) === 0) {
this.socket.end();
return;
}
if (this.inbound) {
this.sendVersion();
}
if (message.version >= 209) {
this.sendMessage('verack', new Buffer([]));
}
this.sendVer = Math.min(message.version, PROTOCOL_VERSION);
if (message.version < 209) {
this.recvVer = Math.min(message.version, PROTOCOL_VERSION);
} else {
// We won't start expecting a checksum until after we've received
// the 'verack' message.
this.once('verack', (function() {
this.recvVer = message.version;
}).bind(this));
}
this.bestHeight = message.start_height;
break;
case 'verack':
this.recvVer = Math.min(message.version, PROTOCOL_VERSION);
this.active = true;
break;
case 'ping':
if ('object' === typeof message.nonce) {
this.sendPong(message.nonce);
}
break;
}
} catch (e) {
console.err('Error while handling "' + message.command + '" message from ' +
this.peer + ':\n' +
(e.stack ? e.stack : e.toString()));
return;
}
this.emit(message.command, {
conn: this,
socket: this.socket,
peer: this.peer,
message: message
});
};
Connection.prototype.sendPong = function(nonce) {
this.sendMessage('pong', nonce);
};
Connection.prototype.sendVersion = function() {
var subversion = '/BitcoinX:0.1/';
var put = new Put();
put.word32le(PROTOCOL_VERSION); // version
put.word64le(1); // services
put.word64le(Math.round(new Date().getTime() / 1000)); // timestamp
put.pad(26); // addr_me
put.pad(26); // addr_you
put.put(nonce);
put.varint(subversion.length);
put.put(new Buffer(subversion, 'ascii'));
put.word32le(0);
this.sendMessage('version', put.buffer());
};
Connection.prototype.sendGetBlocks = function(starts, stop, wantHeaders) {
// Default value for stop is 0 to get as many blocks as possible (500)
stop = stop || BufferUtil.NULL_HASH;
var put = new Put();
// https://en.bitcoin.it/wiki/Protocol_specification#getblocks
put.word32le(this.sendVer);
put.varint(starts.length);
for (var i = 0; i < starts.length; i++) {
if (starts[i].length != 32) {
throw new Error('Invalid hash length');
}
put.put(starts[i]);
}
var stopBuffer = new Buffer(stop, 'binary');
if (stopBuffer.length != 32) {
throw new Error('Invalid hash length');
}
put.put(stopBuffer);
var command = 'getblocks';
if (wantHeaders)
command = 'getheaders';
this.sendMessage(command, put.buffer());
};
Connection.prototype.sendGetHeaders = function(starts, stop) {
this.sendGetBlocks(starts, stop, true);
};
Connection.prototype.sendGetData = function(invs) {
var put = new Put();
put.varint(invs.length);
for (var i = 0; i < invs.length; i++) {
put.word32le(invs[i].type);
put.put(invs[i].hash);
}
this.sendMessage('getdata', put.buffer());
};
Connection.prototype.sendGetAddr = function(invs) {
var put = new Put();
this.sendMessage('getaddr', put.buffer());
};
Connection.prototype.sendInv = function(data) {
if (!Array.isArray(data)) data = [data];
var put = new Put();
put.varint(data.length);
data.forEach(function(value) {
if (value instanceof Block) {
// Block
put.word32le(2); // MSG_BLOCK
} else {
// Transaction
put.word32le(1); // MSG_TX
}
put.put(value.getHash());
});
this.sendMessage('inv', put.buffer());
};
Connection.prototype.sendHeaders = function(headers) {
var put = new Put();
put.varint(headers.length);
headers.forEach(function(header) {
put.put(header);
// Indicate 0 transactions
put.word8(0);
});
this.sendMessage('headers', put.buffer());
};
Connection.prototype.sendTx = function(tx) {
this.sendMessage('tx', tx.serialize());
};
Connection.prototype.sendBlock = function(block, txs) {
var put = new Put();
// Block header
put.put(block.getHeader());
// List of transactions
put.varint(txs.length);
txs.forEach(function(tx) {
put.put(tx.serialize());
});
this.sendMessage('block', put.buffer());
};
Connection.prototype.sendMessage = function(command, payload) {
try {
var magic = this.network.magic;
var commandBuf = new Buffer(command, 'ascii');
if (commandBuf.length > 12) throw 'Command name too long';
var checksum;
if (this.sendVer >= 209) {
checksum = Hash.sha256sha256(payload).slice(0, 4);
} else {
checksum = new Buffer([]);
}
var message = new Put(); // -- HEADER --
message.put(magic); // magic bytes
message.put(commandBuf); // command name
message.pad(12 - commandBuf.length); // zero-padded
message.word32le(payload.length); // payload length
message.put(checksum); // checksum
// -- BODY --
message.put(payload); // payload data
var buffer = message.buffer();
console.debug('[' + this.peer + '] ' +
'Sending message ' + command + ' (' + payload.length + ' bytes)');
this.socket.write(buffer);
} catch (err) {
// TODO: We should catch this error one level higher in order to better
// determine how to react to it. For now though, ignoring it will do.
console.err('Error while sending message to peer ' + this.peer + ': ' +
(err.stack ? err.stack : err.toString()));
}
};
Connection.prototype.handleData = function(data) {
this.buffers.push(data);
if (this.buffers.length > MAX_RECEIVE_BUFFER) {
console.err('Peer ' + this.peer + ' exceeded maxreceivebuffer, disconnecting.' +
(err.stack ? err.stack : err.toString()));
this.socket.destroy();
return;
}
this.processData();
};
Connection.prototype.processData = function() {
// If there are less than 20 bytes there can't be a message yet.
if (this.buffers.length < 20) return;
var magic = this.network.magic;
var i = 0;
for (;;) {
if (this.buffers.get(i) === magic[0] &&
this.buffers.get(i + 1) === magic[1] &&
this.buffers.get(i + 2) === magic[2] &&
this.buffers.get(i + 3) === magic[3]) {
if (i !== 0) {
console.debug('[' + this.peer + '] ' +
'Received ' + i +
' bytes of inter-message garbage: ');
console.debug('... ' + this.buffers.slice(0, i));
this.buffers.skip(i);
}
break;
}
if (i > (this.buffers.length - 4)) {
this.buffers.skip(i);
return;
}
i++;
}
var payloadLen = (this.buffers.get(16)) +
(this.buffers.get(17) << 8) +
(this.buffers.get(18) << 16) +
(this.buffers.get(19) << 24);
var startPos = (this.recvVer >= 209) ? 24 : 20;
var endPos = startPos + payloadLen;
if (this.buffers.length < endPos) return;
var command = this.buffers.slice(4, 16).toString('ascii').replace(/\0+$/, '');
var payload = this.buffers.slice(startPos, endPos);
var checksum = (this.recvVer >= 209) ? this.buffers.slice(20, 24) : null;
console.debug('[' + this.peer + '] ' +
'Received message ' + command +
' (' + payloadLen + ' bytes)');
if (checksum !== null) {
var checksumConfirm = Hash.sha256sha256(payload).slice(0, 4);
if (buffertools.compare(checksumConfirm, checksum) !== 0) {
console.err('[' + this.peer + '] ' +
'Checksum failed', {
cmd: command,
expected: checksumConfirm.toString('hex'),
actual: checksum.toString('hex')
});
return;
}
}
var message;
try {
message = this.parseMessage(command, payload);
} catch (e) {
console.err('Error while parsing message ' + command + ' from ' +
this.peer + ':\n' +
(e.stack ? e.stack : e.toString()));
}
if (message) {
this.handleMessage(message);
}
this.buffers.skip(endPos);
this.processData();
};
Connection.prototype.parseMessage = function(command, payload) {
var parser = new BufferReader(payload);
var data = {
command: command
};
var i;
switch (command) {
case 'version': // https://en.bitcoin.it/wiki/Protocol_specification#version
data.version = parser.readUInt32LE();
data.services = parser.readUInt64LEBN();
data.timestamp = parser.readUInt64LEBN();
data.addr_me = parser.read(26);
data.addr_you = parser.read(26);
data.nonce = parser.read(8);
data.subversion = parser.readVarintBuf();
data.start_height = parser.readUInt32LE();
break;
case 'inv':
case 'getdata':
data.count = parser.readVarintNum();
data.invs = [];
for (i = 0; i < data.count; i++) {
data.invs.push({
type: parser.readUInt32LE(),
hash: parser.read(32)
});
}
break;
case 'headers':
data.count = parser.readVarintNum();
data.headers = [];
for (i = 0; i < data.count; i++) {
var header = Block.fromBufferReader(parser);
data.headers.push(header);
}
break;
case 'block':
var block = Block.fromBufferReader(parser);
data.block = block;
data.version = block.version;
data.prev_hash = block.prev_hash;
data.merkle_root = block.merkle_root;
data.timestamp = block.timestamp;
data.bits = block.bits;
data.nonce = block.nonce;
data.txs = block.txs;
data.size = payload.length;
break;
case 'tx':
var tx = Transaction.fromBufferReader(parser);
return {
command: command,
version: tx.version,
lock_time: tx.lock_time,
ins: tx.ins,
outs: tx.outs,
tx: tx,
};
case 'getblocks':
case 'getheaders':
// parse out the version
data.version = parser.readUInt32LE();
// TODO: Limit block locator size?
// reference implementation limits to 500 results
var startCount = parser.readVarintNum();
data.starts = [];
for (i = 0; i < startCount; i++) {
data.starts.push(parser.read(32));
}
data.stop = parser.read(32);
break;
case 'addr':
var addrCount = parser.readVarintNum();
// Enforce a maximum number of addresses per message
if (addrCount > 1000) {
addrCount = 1000;
}
data.addrs = [];
for (i = 0; i < addrCount; i++) {
// TODO: Time actually depends on the version of the other peer (>=31402)
data.addrs.push({
time: parser.readUInt32LE(),
services: parser.readUInt64LEBN(),
ip: parser.read(16),
port: parser.readUInt16BE()
});
}
break;
case 'alert':
data.payload = parser.readVarintBuf();
data.signature = parser.readVarintBuf();
break;
case 'ping':
if (this.recvVer > BIP0031_VERSION) {
data.nonce = parser.read(8);
}
break;
case 'getaddr':
case 'verack':
case 'reject':
// Empty message, nothing to parse
break;
default:
console.err('Connection.parseMessage(): Command not implemented', {
cmd: command
});
// This tells the calling function not to issue an event
return null;
}
return data;
};
module.exports = Connection;

6
lib/util/buffer.js

@ -2,6 +2,7 @@
var buffer = require('buffer');
var assert = require('assert');
var buffertools = require('buffertools');
var js = require('./js');
@ -19,6 +20,11 @@ function equals(a, b) {
}
module.exports = {
NULL_HASH: buffertools.fill(new Buffer(32), 0),
EMPTY_BUFFER: new Buffer(0),
ZERO_VALUE: buffertools.fill(new Buffer(8), 0),
INT64_MAX: new Buffer('ffffffffffffffff', 'hex'),
/**
* Returns true if the given argument is an instance of a buffer. Tests for
* both node's Buffer and Uint8Array

41
test/transport/connection.js

@ -0,0 +1,41 @@
'use strict';
var chai = require('chai');
var bitcore = require('../..');
var should = chai.should();
var ConnectionModule = bitcore.transport.Connection;
var Connection;
var nop = function() {};
describe.only('Connection', function() {
it('should initialze the main object', function() {
should.exist(ConnectionModule);
});
it('should be able to create class', function() {
Connection = ConnectionModule;
should.exist(Connection);
});
it('should be able to create instance', function() {
var mSocket = {server: null, addListener: nop},
mPeer;
var c = new Connection(mSocket, mPeer);
should.exist(c);
});
if (typeof process !== 'undefined' && process.versions) { //node-only tests
it('should create a proxied socket if instructed', function() {
var mPeer;
var c = new Connection(null, mPeer, {
proxy: { host: 'localhost', port: 9050 }
});
should.exist(c.socket);
});
};
});
Loading…
Cancel
Save