diff --git a/.gitignore b/.gitignore index 3c3629e..7938f33 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ node_modules +sandbox.js diff --git a/README.md b/README.md index b60d28a..021232a 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,15 @@ Make RPC calls over a [Kademlia](https://pdos.csail.mit.edu/~petar/papers/maymou ``` npm install dht-rpc ``` + [![build status](http://img.shields.io/travis/mafintosh/dht-rpc.svg?style=flat)](http://travis-ci.org/mafintosh/dht-rpc) +## Key Features + +* UDP hole punching support +* Easily add any command to your DHT +* Streaming queries and updates + ## Usage Here is an example implementing a simple key value store @@ -14,10 +21,10 @@ Here is an example implementing a simple key value store First spin up a bootstrap node. You can make multiple if you want for redundancy. ``` js -var dht = require('dht-rpc') +const dht = require('dht-rpc') // Set ephemeral: true so other peers do not add us to the peer list, simply bootstrap -var bootstrap = dht({ephemeral: true}) +const bootstrap = dht({ ephemeral: true }) bootstrap.listen(10001) ``` @@ -25,39 +32,42 @@ bootstrap.listen(10001) Now lets make some dht nodes that can store values in our key value store. ``` js -var dht = require('dht-rpc') -var crypto = require('crypto') +const dht = require('dht-rpc') +const crypto = require('crypto') // Let's create 100 dht nodes for our example. for (var i = 0; i < 100; i++) createNode() function createNode () { - var node = dht({ - bootstrap: ['localhost:10001'] - }) - - var values = {} - - // When we are the closest node and someone is sending us a "store" command - node.on('update:values', function (query, cb) { - if (!query.value) return cb() - - // Use the hash of the value as the key - var key = sha256(query.value).toString('hex') - values[key] = query.value - console.log('Storing', key, '-->', query.value.toString()) - cb() + const node = dht({ + bootstrap: [ + 'localhost:10001' + ] }) - // When someone is querying for a "lookup" command - node.on('query:values', function (query, cb) { - var value = values[query.target.toString('hex')] - cb(null, value) + const values = new Map() + + node.command('values', { + // When we are the closest node and someone is sending us a "store" command + update (query, cb) { + if (!query.value) return cb() + + // Use the hash of the value as the key + const key = sha256(query.value).toString('hex') + values.set(key, query.value) + console.log('Storing', key, '-->', query.value.toString()) + cb() + }, + // When someone is querying for a "lookup" command + query (query, cb) { + const value = values[query.target.toString('hex')] + cb(null, value) + } }) } function sha256 (val) { - return crypto.createHash('sha256').update(val).digest('hex') + return crypto.createHash('sha256').update(val).digest() } ``` @@ -65,9 +75,9 @@ To insert a value into this dht make another script that does this following ``` js // Set ephemeral: true as we are not part of the network. -var node = dht({ephemeral: true}) +const node = dht({ ephemeral: true }) -node.update({command: 'values', target: sha256(val), value: val}, function (err, res) { +node.update('values', sha256(val), value, function (err, res) { if (err) throw err console.log('Inserted', sha256(val).toString('hex')) }) @@ -76,7 +86,7 @@ node.update({command: 'values', target: sha256(val), value: val}, function (err, Then after inserting run this script to query for a value ``` js -node.query({command: 'values', target: new Buffer(hexFromAbove, 'hex')}) +node.query('values', Buffer.from(hexFromAbove, 'hex')) .on('data', function (data) { if (data.value && sha256(data.value).toString('hex') === hexFromAbove) { // We found the value! Destroy the query stream as there is no need to continue. @@ -91,74 +101,135 @@ node.query({command: 'values', target: new Buffer(hexFromAbove, 'hex')}) ## API -#### `var node = dht([options])` +#### `const node = dht([options])` -Create a new DHT node. Options include +Create a new DHT node. -``` js +Options include: + +```js { - id: nodeId, // id of the node - ephemeral: false, // will this node answer queries? - bootstrap: ['host:port'], // bootstrap nodes - socket: udpSocket // optional udp socket + // Whether or not this node is ephemeral or should join the routing table + ephemeral: false, + // A list of bootstrap nodes + bootstrap: [ 'bootstrap-node.com:24242', ... ], + // Optionally pass in your own UDP socket to use. + socket: udpSocket } ``` -#### `var stream = node.query(query, [options], [callback])` +#### `node.command(name, cmd)` -Create a new query. Query should look like this +Define a new RPC command. `cmd` should look like this -``` js +```js { - command: 'command-to-run', - target: new Buffer('32 byte target'), - value: new Buffer('some payload') + // Query handler + query (query, cb), + // Update handler. only triggered when we are one of the closest nodes to the target + update (query, cb), + // Optional value encoding for the query/update incoming value. Defaults to binary. + inputEncoding: 'json', 'utf-8', object, + // Optional value encoding for the query/update outgoing value. Defaults to binary. + outputEncoding: (same as above), + valueEncoding: (sets both input/output encoding to this) } ``` -And options include +The `query` object in the query/update function looks like this: -``` js +```js +{ + // always the same as your command def + command: 'command-name', + // the node who sent the query/update + node: { port, host, id }, + // the query/update target (32 byte target) + target: Buffer, + // the query/update payload decoded with the inputEncoding + value +} +``` + +You should call the query/update callback with `(err, value)` where +value will be encoded using the outputEncoding and returned to the node. + +#### `const stream = node.query(name, target, [value], [callback])` + +Send a query command. + +If you set a valueEncoding when defining the command the value will be encoded. + +Returns a result stream that emits data that looks like this: + +```js { - nodes: [{host: 'example.com', port: 4224}], // only contact these nodes - holepunching: true // set to false to disable hole punching + // was this a query/update response + type: dht.QUERY, + // who sent this response + node: { peer, host, id }, + // the response payload decoded using the outputEncoding + value } ``` -The stream will emit query results as they arrive. If you backpressure the query it will backpressure the query as well. -Call `.destroy()` on the stream to cancel the query. If you pass the callback the streams payload will be buffered and passed to that. +If you pass a callback the stream will be error handled and buffered +and the content passed as an array. + +#### `const stream = node.update(name, target, [value], [callback])` + +Send a update command -#### `var stream = node.update(query, [options], [callback])` +Same options/results as above but the response data will have `type` +set to `dht.UPDATE`. -Same as a query but will trigger an update query on the 20 closest nodes (distance between node ids and target) after the query finishes. -Per default the stream will only contain results from the closest query. To include the query results also pass the `query: true` option. +#### `const stream = node.queryAndUpdate(name, target, [value], [callback])` -#### `node.on('query:{command}', data, callback)` +Send a combined query and update command. -Called when a specific query is invoked on a node. `data` contains the same values as in the query above and also a `.node` property with info about the node invoking the query. +Will keep querying until it finds the closest nodes to the target and then +issue an update. More efficient than doing a query/update yourself. -Call the callback with `(err, value)` to respond. +Same options/results as above but the response data will include both +query and update results. -#### `node.on('update:{command}', data, callback)` +#### `node.destroy(onclose)` -Called when an update query is invoked. The `data.node` is also guaranteed to have roundtripped to this dht before, meaning that you can trust that the host, port was not spoofed. +Fully destroys the dht node. -#### `node.ready(callback)` +#### `node.bootstrap(cb)` -Makes sure the initial bootstrap table has been built. You do not need to wait for this before querying. +Re-bootstrap the DHT node. Normally you shouldn't have to call this. + +#### `node.holepunch(peer, cb)` + +UDP holepunch to another peer. The DHT does this automatically +when it cannot reach another peer but you can use this yourself also. + +Peer should look like this: + +```js +{ + port, + host, + // referrer should be the node/peer that + // told you about this node. + referrer: { port, host } +} +``` -#### `node.bootstrap([callback])` +#### `node.listen([port], [address], [onlistening])` -Rebootstrap your node. Call this at regular intervals if you aren't doing any other queries. +Explicitly bind the dht node to a certain port/address. -#### `node.holepunch(peer, referrer, callback)` +#### `node.on('listening')` -UDP hole punch to another peer using the `referrer` as a STUN server. +Emitted when the node starts listening on a udp port. -#### `node.destroy()` +#### `node.on('close')` -Destroy the dht node. Releases all resources. +Emitted when the node is fully closed. -## License +#### `node.on('holepunch', fromPeer, toPeer)` -MIT +Emitted when the node is helping `fromPeer` udp holepunch to `toPeer`. diff --git a/example.js b/example.js index a2631f1..614440f 100644 --- a/example.js +++ b/example.js @@ -1,46 +1,50 @@ -var dht = require('./') -var blake2b = require('./blake2b') - -var node = dht({ - bootstrap: 'localhost:49737', - ephemeral: !!process.argv[2] -}) - -var values = {} - -node.on('update:store', function (query, cb) { - console.log('(onupdate)') - if (!query.value) return cb() - var key = blake2b(query.value).toString('hex') - values[key] = query.value - console.log('Storing', key, '-->', query.value.toString()) - cb() -}) - -node.on('query:lookup', function (query, cb) { - console.log('(onquery)') - var value = values[query.target.toString('hex')] - cb(null, value) -}) - -if (process.argv.length > 3) { - var val = process.argv.slice(3).join(' ') - if (process.argv[2] === 'put') { - node.update({command: 'store', target: blake2b(Buffer.from(val)), value: val}, function (err) { - if (err) throw err - console.log('Inserted', blake2b(Buffer.from(val)).toString('hex')) - }) - } - if (process.argv[2] === 'get') { - node.query({command: 'lookup', target: Buffer.from(val, 'hex')}) - .on('data', function (data) { - if (data.value && blake2b(data.value).toString('hex') === val) { - console.log(val, '-->', data.value.toString()) - this.destroy() - } - }) - .on('end', function () { - console.log('(query finished)') - }) - } +const dht = require('./') + +const bootstrap = dht() +bootstrap.listen(10001) + +const nodes = [] +var swarm = 1000 +loop(null) + +function loop (err) { + if (err) throw err + if (swarm--) addNode(loop) + else done() +} + +function done () { + console.log('executing hi update') + + const i = Math.floor(Math.random() * nodes.length) + const rs = nodes[i].update('hi', Buffer.alloc(32)) + + rs.resume() + rs.on('end', function () { + setTimeout(done, 2000) + }) +} + +function addNode (cb) { + const node = dht({ + bootstrap: [ + 10001 + ] + }) + + var hits = 0 + node.command('hi', { + update (query, cb) { + console.log('hi', ++hits) + cb(null) + }, + query (query, cb) { + cb(null) + } + }) + + node.once('ready', function () { + nodes.push(node) + cb() + }) } diff --git a/index.js b/index.js index cca77da..2317080 100644 --- a/index.js +++ b/index.js @@ -1,446 +1,384 @@ -var udp = require('udp-request') -var KBucket = require('k-bucket') -var inherits = require('inherits') -var events = require('events') -var peers = require('ipv4-peers') -var collect = require('stream-collector') -var sodium = require('sodium-universal') -var tos = require('time-ordered-set') -var nodes = peers.idLength(32) -var messages = require('./messages') -var queryStream = require('./query-stream') -var blake2b = require('./blake2b') - -module.exports = DHT - -function DHT (opts) { - if (!(this instanceof DHT)) return new DHT(opts) - if (!opts) opts = {} - - events.EventEmitter.call(this) - - var self = this - - this.concurrency = opts.concurrency || 16 - this.id = opts.id || randomBytes(32) - this.ephemeral = !!opts.ephemeral - this.bucket = new KBucket({localNodeId: this.id, arbiter: arbiter}) - this.bucket.on('ping', onnodeping) - this.inflightQueries = 0 - - this.socket = udp({ - socket: opts.socket, - requestEncoding: messages.Request, - responseEncoding: messages.Response - }) - - this.socket.on('request', onrequest) - this.socket.on('response', onresponse) - this.socket.on('close', onclose) - - this.nodes = tos() - - this._bootstrap = [].concat(opts.bootstrap || []).map(parseAddr) - this._queryId = this.ephemeral ? null : this.id - this._bootstrapped = false - this._pendingRequests = [] - this._tick = 0 - this._secrets = [randomBytes(32), randomBytes(32)] - this._secretsInterval = setInterval(rotateSecrets, 5 * 60 * 1000) - this._tickInterval = setInterval(tick, 5 * 1000) - - if (opts.nodes) { - for (var i = 0; i < opts.nodes.length; i++) { - this._addNode(opts.nodes[i].id, opts.nodes[i]) - } +const { EventEmitter } = require('events') +const peers = require('ipv4-peers') +const dgram = require('dgram') +const sodium = require('sodium-universal') +const KBucket = require('k-bucket') +const tos = require('time-ordered-set') +const collect = require('stream-collector') +const codecs = require('codecs') +const { Message, Holepunch } = require('./lib/messages') +const IO = require('./lib/io') +const QueryStream = require('./lib/query-stream') +const blake2b = require('./lib/blake2b') + +const UNSUPPORTED_COMMAND = new Error('Unsupported command') +const nodes = peers.idLength(32) + +exports = module.exports = opts => new DHT(opts) + +class DHT extends EventEmitter { + constructor (opts) { + if (!opts) opts = {} + + super() + + this.bootstrapped = false + this.destroyed = false + this.concurrency = 16 + this.socket = dgram.createSocket('udp4') + this.id = randomBytes(32) + this.inflightQueries = 0 + this.ephemeral = !!opts.ephemeral + + this.nodes = tos() + this.bucket = new KBucket({ localNodeId: this.id }) + this.bucket.on('ping', this._onnodeping.bind(this)) + this.bootstrapNodes = [].concat(opts.bootstrap || []).map(parsePeer) + + this.socket.on('listening', this.emit.bind(this, 'listening')) + this.socket.on('close', this.emit.bind(this, 'close')) + + const queryId = this.ephemeral ? null : this.id + const io = new IO(this.socket, queryId, this) + + this._io = io + this._commands = new Map() + this._tick = 0 + this._tickInterval = setInterval(this._ontick.bind(this), 5000) + + process.nextTick(this.bootstrap.bind(this)) } - process.nextTick(function () { - self.bootstrap() - }) - - function rotateSecrets () { - self._rotateSecrets() + _ontick () { + this._tick++ + if ((this._tick & 7) === 0) this._pingSome() } - function onrequest (request, peer) { - self._onrequest(request, peer) + address () { + return this.socket.address() } - function onresponse (response, peer) { - self._onresponse(response, peer) + command (name, opts) { + this._commands.set(name, { + inputEncoding: codecs(opts.inputEncoding || opts.valueEncoding), + outputEncoding: codecs(opts.outputEncoding || opts.valueEncoding), + query: opts.query || queryNotSupported, + update: opts.update || updateNotSupported + }) } - function onnodeping (oldContacts, newContact) { - self._onnodeping(oldContacts, newContact) + ready (onready) { + if (!this.bootstrapped) this.once('ready', onready) + else onready() } - function onclose () { - while (self._pendingRequests.length) { - self._pendingRequests.shift().callback(new Error('Request cancelled')) + onrequest (type, message, peer) { + if (validateId(message.id)) { + this._addNode(message.id, peer, null) } - self.emit('close') - } - function tick () { - self._tick++ - if ((self._tick & 7) === 0) self._pingSome() - } -} + switch (message.command) { + case '_ping': + return this._onping(message, peer) -inherits(DHT, events.EventEmitter) + case '_find_node': + return this._onfindnode(message, peer) -DHT.prototype.ready = function (cb) { - if (!this._bootstrapped) this.once('ready', cb) - else cb() -} + case '_holepunch': + return this._onholepunch(message, peer) -DHT.prototype.query = function (query, opts, cb) { - if (typeof opts === 'function') return this.query(query, null, opts) - return collect(queryStream(this, query, opts), cb) -} - -DHT.prototype.update = function (query, opts, cb) { - if (typeof opts === 'function') return this.update(query, null, opts) - if (!opts) opts = {} - if (opts.query) opts.verbose = true - opts.token = true - return collect(queryStream(this, query, opts), cb) -} - -DHT.prototype._pingSome = function () { - var cnt = this.inflightQueries > 2 ? 1 : 3 - var oldest = this.nodes.oldest - - while (cnt--) { - if (!oldest || this._tick - oldest.tick < 3) continue - this._check(oldest) - oldest = oldest.next + default: + return this._oncommand(type, message, peer) + } } -} -DHT.prototype.holepunch = function (peer, referrer, cb) { - peer = parseAddr(peer) - referrer = parseAddr(referrer) - - this._ping(peer, noop) - this._holepunch(peer, referrer, cb) -} - -DHT.prototype.ping = function (peer, cb) { - this._ping(parseAddr(peer), function (err, res, peer) { - if (err) return cb(err) - var rinfo = decodePeer(res.value) - if (!rinfo) return cb(new Error('Invalid pong')) - cb(null, rinfo, {port: peer.port, host: peer.host, id: res.id}) - }) -} + _onping (message, peer) { + if (message.value && !this.id.equals(message.value)) return + this._io.response(message, peers.encode([ peer ]), null, peer) + } -DHT.prototype.toArray = function () { - return this.bucket.toArray() -} + _onholepunch (message, peer) { + const value = decodeHolepunch(message.value) + if (!value) return + + if (value.to) { + const to = decodePeer(value.to) + if (!to || samePeer(to, peer)) return + message.id = this._io.id + message.value = Holepunch.encode({ from: peers.encode([ peer ]) }) + this.emit('holepunch', peer, to) + this._io.send(Message.encode(message), to) + return + } -DHT.prototype.destroy = function () { - clearInterval(this._secretsInterval) - clearInterval(this._tickInterval) - this.socket.destroy() -} + if (value.from) { + const from = decodePeer(value.from) + if (from) peer = from + } -DHT.prototype.address = function () { - return this.socket.address() -} + this._io.response(message, null, null, peer) + } -DHT.prototype._rotateSecrets = function () { - var secret = randomBytes(32) - this._secrets[1] = this._secrets[0] - this._secrets[0] = secret -} + _onfindnode (message, peer) { + if (!validateId(message.target)) return -DHT.prototype.bootstrap = function (cb) { - var self = this + const closerNodes = nodes.encode(this.bucket.closest(message.target, 20)) + this._io.response(message, null, closerNodes, peer) + } - if (!this._bootstrap.length) return process.nextTick(done) + _oncommand (type, message, peer) { + if (!message.target) return - var backgroundCon = Math.min(self.concurrency, Math.max(2, Math.floor(self.concurrency / 8))) - var qs = this.query({ - command: '_find_node', - target: this.id - }) + const self = this + const cmd = this._commands.get(message.command) - qs.on('data', update) - qs.on('error', onerror) - qs.on('end', done) + if (!cmd) return reply(UNSUPPORTED_COMMAND) - update() + const query = { + type, + command: message.command, + node: peer, + target: message.target, + value: cmd.inputEncoding.decode(message.value) + } - function onerror (err) { - if (cb) cb(err) - } + if (type === IO.UPDATE) cmd.update(query, reply) + else cmd.query(query, reply) - function done () { - if (!self._bootstrapped) { - self._bootstrapped = true - self.emit('ready') + function reply (err, value) { + const closerNodes = nodes.encode(self.bucket.closest(message.target, 20)) + if (err) return self._io.error(message, err, closerNodes, peer) + self._io.response(message, value && cmd.outputEncoding.encode(value), closerNodes, peer) } - if (cb) cb() } - function update () { - qs._concurrency = self.inflightQueries === 1 ? self.concurrency : backgroundCon + onresponse (message, peer) { + if (validateId(message.id)) { + this._addNode(message.id, peer, message.roundtripToken) + } } -} - -DHT.prototype._ping = function (peer, cb) { - this._request({command: '_ping', id: this._queryId}, peer, false, cb) -} - -DHT.prototype._holepunch = function (peer, referrer, cb) { - // Expects the caller to have already sent a message to peer to open the firewall session - this._request({command: '_ping', id: this._queryId, forwardRequest: encodePeer(peer)}, referrer, false, cb) -} -DHT.prototype._request = function (request, peer, important, cb) { - if (this.socket.inflight >= this.concurrency || this._pendingRequests.length) { - this._pendingRequests.push({request: request, peer: peer, callback: cb}) - } else { - this.socket.request(request, peer, cb) + holepunch (peer, cb) { + if (!peer.referrer) throw new Error('peer.referrer is required') + this._io.query('_holepunch', null, null, peer, cb) } -} - -DHT.prototype._onrequest = function (request, peer) { - if (validateId(request.id)) this._addNode(request.id, peer, request.roundtripToken) - if (request.roundtripToken) { - if (!request.roundtripToken.equals(this._token(peer, 0))) { - if (!request.roundtripToken.equals(this._token(peer, 1))) { - request.roundtripToken = null - } - } + destroy () { + this.destroyed = true + this._io.destroy() + clearInterval(this._tickInterval) } - if (request.forwardRequest) { - this._forwardRequest(request, peer) - return + ping (peer, cb) { + this._io.query('_ping', null, peer.id, peer, function (err, res) { + if (err) return cb(err) + if (res.error) return cb(new Error(res.error)) + const pong = decodePeer(res.value) + if (!pong) return cb(new Error('Invalid pong')) + cb(null, pong) + }) } - if (request.forwardResponse) peer = this._forwardResponse(request, peer) + _addNode (id, peer, token) { + if (id.equals(this.id)) return - switch (request.command) { - case '_ping': return this._onping(request, peer) - case '_find_node': return this._onfindnode(request, peer) - } + var node = this.bucket.get(id) + const fresh = !node - this._onquery(request, peer) -} + if (!node) node = {} -DHT.prototype._forwardResponse = function (request, peer) { - if (request.command !== '_ping') return // only allow ping for now + node.id = id + node.port = peer.port + node.host = peer.host + if (token) node.roundtripToken = token + node.tick = this._tick - try { - var from = peers.decode(request.forwardResponse)[0] - if (!from) return - } catch (err) { - return + if (!fresh) this.nodes.remove(node) + this.nodes.add(node) + this.bucket.add(node) + if (fresh) this.emit('add-node', node) } - from.request = true - from.tid = peer.tid - - return from -} - -DHT.prototype._forwardRequest = function (request, peer) { - if (request.command !== '_ping') return // only allow ping forwards right now - - try { - var to = peers.decode(request.forwardRequest)[0] - if (!to) return - } catch (err) { - return + _removeNode (node) { + this.nodes.remove(node) + this.bucket.remove(node.id) + this.emit('remove-node') } - this.emit('holepunch', peer, to) - request.forwardRequest = null - request.forwardResponse = encodePeer(peer) - this.socket.forwardRequest(request, peer, to) -} - -DHT.prototype._onquery = function (request, peer) { - if (!validateId(request.target)) return - - var self = this - var query = { - node: { - id: request.id, - port: peer.port, - host: peer.host - }, - command: request.command, - target: request.target, - value: request.value, - roundtripToken: request.roundtripToken + _token (peer, i) { + return blake2b.batch([ + this._secrets[i], + Buffer.from(peer.host) + ]) } - var method = request.roundtripToken ? 'update' : 'query' + _onnodeping (oldContacts, newContact) { + // if bootstrapping, we've recently pinged all nodes + if (!this.bootstrapped) return + + const reping = [] - if (!this.emit(method + ':' + request.command, query, callback) && !this.emit(method, query, callback)) callback() + for (var i = 0; i < oldContacts.length; i++) { + const old = oldContacts[i] - function callback (err, value) { - if (err) return + // check if we recently talked to this peer ... + if (this._tick === old.tick) { + this.bucket.add(oldContacts[i]) + continue + } - var res = { - id: self._queryId, - value: value || null, - nodes: nodes.encode(self.bucket.closest(request.target, 20)), - roundtripToken: self._token(peer, 0) + reping.push(old) } - self.socket.response(res, peer) + if (reping.length) this._reping(reping, newContact) } -} - -DHT.prototype._onresponse = function (response, peer) { - if (validateId(response.id)) this._addNode(response.id, peer, response.roundtripToken) - while (this.socket.inflight < this.concurrency && this._pendingRequests.length) { - var next = this._pendingRequests.shift() - this.socket.request(next.request, next.peer, next.callback) + _check (node) { + const self = this + this.ping(node, function (err) { + if (err) self._removeNode(node) + }) } -} -DHT.prototype._onping = function (request, peer) { - var res = { - id: this._queryId, - value: encodePeer(peer), - roundtripToken: this._token(peer, 0) - } + _reping (oldContacts, newContact) { + const self = this - this.socket.response(res, peer) -} + ping() -DHT.prototype._onfindnode = function (request, peer) { - if (!validateId(request.target)) return + function ping () { + const next = oldContacts.shift() + if (!next) return + self._io.queryImmediately('_ping', null, next.id, next, afterPing) + } - var res = { - id: this._queryId, - nodes: nodes.encode(this.bucket.closest(request.target, 20)), - roundtripToken: this._token(peer, 0) + function afterPing (err, res, node) { + if (!err) return ping() + self._removeNode(node) + self.bucket.add(newContact) + } } - this.socket.response(res, peer) -} - -DHT.prototype._onnodeping = function (oldContacts, newContact) { - if (!this._bootstrapped) return // bootstrapping, we've recently pinged all nodes + _pingSome () { + var cnt = this.inflightQueries > 2 ? 1 : 3 + var oldest = this.nodes.oldest - var reping = [] - - for (var i = 0; i < oldContacts.length; i++) { - var old = oldContacts[i] - - if (this._tick - old.tick < 3) { // less than 10s since we talked to this peer ... - this.bucket.add(oldContacts[i]) - continue + while (cnt--) { + if (!oldest || this._tick === oldest.tick) continue + this._check(oldest) + oldest = oldest.next } - - reping.push(old) } - if (reping.length) this._reping(reping, newContact) -} - -DHT.prototype._check = function (node) { - var self = this - this._request({command: '_ping', id: this._queryId}, node, false, function (err) { - if (err) self._removeNode(node) - }) -} - -DHT.prototype._reping = function (oldContacts, newContact) { - var self = this - var next = null + query (command, target, value, cb) { + if (typeof value === 'function') return this.query(command, target, null, value) + return collect(this.runCommand(command, target, value, { query: true, update: false }), cb) + } - ping() + update (command, target, value, cb) { + if (typeof value === 'function') return this.update(command, target, null, value) + return collect(this.runCommand(command, target, value, { query: false, update: true }), cb) + } - function ping () { - next = oldContacts.shift() - if (next) self._request({command: '_ping', id: self._queryId}, next, true, afterPing) + queryAndUpdate (command, target, value, cb) { + if (typeof value === 'function') return this.queryAndUpdate(command, target, null, value) + return collect(this.runCommand(command, target, value, { query: true, update: true }), cb) } - function afterPing (err) { - if (!err) return ping() + runCommand (command, target, value, opts) { + return new QueryStream(this, command, target, value, opts) + } - self._removeNode(next) - self.bucket.add(newContact) + listen (port, addr, cb) { + if (typeof port === 'function') return this.listen(0, null, port) + if (typeof addr === 'function') return this.listen(port, null, addr) + if (cb) this.once('listening', cb) + this.socket.bind(port, addr) } -} -DHT.prototype._token = function (peer, i) { - return blake2b.batch([this._secrets[i], Buffer.from(peer.host)]) -} + bootstrap (cb) { + const self = this + const backgroundCon = Math.min(this.concurrency, Math.max(2, Math.floor(this.concurrency / 8))) -DHT.prototype._addNode = function (id, peer, token) { - if (id.equals(this.id)) return + if (!this.bootstrapNodes.length) return process.nextTick(done) - var node = this.bucket.get(id) - var fresh = !node + const qs = this.query('_find_node', this.id) - if (!node) node = {} + qs.on('data', update) + qs.on('error', onerror) + qs.on('end', done) - node.id = id - node.port = peer.port - node.host = peer.host - node.roundtripToken = token - node.tick = this._tick + update() - if (!fresh) this.nodes.remove(node) - this.nodes.add(node) + function onerror (err) { + if (cb) cb(err) + } + + function done () { + if (!self.bootstrapped) { + self.bootstrapped = true + self.emit('ready') + } + if (cb) cb() + } - this.bucket.add(node) - if (fresh) this.emit('add-node', node) + function update () { + qs._concurrency = self.inflightQueries === 1 ? self.concurrency : backgroundCon + } + } } -DHT.prototype._removeNode = function (node) { - this.nodes.remove(node) - this.bucket.remove(node.id) - this.emit('remove-node', node) +exports.QUERY = DHT.QUERY = IO.QUERY +exports.UPDATE = DHT.UPDATE = IO.UPDATE +exports.DHT = DHT + +function validateId (id) { + return id && id.length === 32 } -DHT.prototype.listen = function (port, cb) { - this.socket.listen(port, cb) +function randomBytes (n) { + const buf = Buffer.allocUnsafe(n) + sodium.randombytes_buf(buf) + return buf } -function encodePeer (peer) { - return peer && peers.encode([peer]) +function decodeHolepunch (buf) { + try { + return Holepunch.decode(buf) + } catch (err) { + return null + } } function decodePeer (buf) { try { - return buf && peers.decode(buf)[0] + const p = peers.decode(buf)[0] + if (!p) throw new Error('No peer in buffer') + return p } catch (err) { return null } } -function parseAddr (addr) { - if (typeof addr === 'object' && addr) return addr - if (typeof addr === 'number') return parseAddr(':' + addr) - if (addr[0] === ':') return parseAddr('127.0.0.1' + addr) - return {port: Number(addr.split(':')[1] || 3282), host: addr.split(':')[0]} -} +function parsePeer (peer) { + if (typeof peer === 'object' && peer) return peer + if (typeof peer === 'number') return parsePeer(':' + peer) + if (peer[0] === ':') return parsePeer('127.0.0.1' + peer) -function validateId (id) { - return id && id.length === 32 + const parts = peer.split(':') + return { + host: parts[0], + port: parseInt(parts[1], 10) + } } -function arbiter (incumbant, candidate) { - return candidate +function samePeer (a, b) { + return a.port === b.port && a.host === b.host } -function randomBytes (n) { - var buf = Buffer.allocUnsafe(n) - sodium.randombytes_buf(buf) - return buf +function updateNotSupported (query, cb) { + cb(new Error('Update not supported')) } -function noop () {} +function queryNotSupported (query, cb) { + cb(null, null) +} diff --git a/blake2b.js b/lib/blake2b.js similarity index 100% rename from blake2b.js rename to lib/blake2b.js diff --git a/lib/io.js b/lib/io.js new file mode 100644 index 0000000..082d1a2 --- /dev/null +++ b/lib/io.js @@ -0,0 +1,299 @@ +const { Message, Holepunch, TYPE } = require('./messages') +const blake2b = require('./blake2b') +const peers = require('ipv4-peers') +const sodium = require('sodium-universal') + +const QUERY = Symbol('QUERY') +const UPDATE = Symbol('UPDATE') + +const ECANCELLED = new Error('Request cancelled') +const ETIMEDOUT = new Error('Request timed out') + +ETIMEDOUT.code = 'ETIMEDOUT' +ECANCELLED.code = 'ECANCELLED' + +const TRIES = 3 + +class IO { + constructor (socket, id, ctx) { + this.id = id + this.socket = socket + this.inflight = [] + + this._ctx = ctx + this._rid = (Math.random() * 65536) | 0 + this._requests = new Array(65536) + this._pending = [] + this._secrets = [ randomBytes(32), randomBytes(32) ] + this._tickInterval = setInterval(this._ontick.bind(this), 250) + this._rotateInterval = setInterval(this._onrotate.bind(this), 300000) + + socket.on('message', this._onmessage.bind(this)) + } + + _token (peer, i) { + return blake2b.batch([ + this._secrets[i], + Buffer.from(peer.host) + ]) + } + + _free () { + const rid = this._rid++ + if (this._rid === 65536) this._rid = 0 + return rid + } + + /* + + R + / \ + A B + + A sent a message to B that failed + + It could be that the message got dropped + or that it needs holepunching + + To retry + + resend(req, A -> B) + fire_and_forget({ _holepunch, to: B }, A -> R) + + R.onholepunch { to: B } => fire_and_forget({ _holepunch, from: A }, R -> B) + B.onholepunch { from: A } => fire_and_forget({ _holepunch }, B -> A) + + A and B is now holepunched and the session has been retried as well + + */ + + _holepunch (req) { + const rid = req.message.command === '_holepunch' + ? req.rid + : this._free() + + const punch = { + type: req.message.type, + rid, + command: '_holepunch', + value: Holepunch.encode({ + to: peers.encode([ req.peer ]) + }) + } + + this.send(Message.encode(punch), req.peer.referrer) + } + + _retry (req) { + req.timeout = 4 + this.send(req.buffer, req.peer) + // if referrer is avail, try holepunching automatically + if (req.peer.referrer) this._holepunch(req) + } + + _onmessage (buf, rinfo) { + const message = decodeMessage(buf) + if (!message) return + + const peer = { port: rinfo.port, host: rinfo.address } + + switch (message.type) { + case TYPE.RESPONSE: + this._ctx.onresponse(message, peer) + this._finish(message.rid, null, message, peer) + break + + case TYPE.QUERY: + this._ctx.onrequest(QUERY, message, peer) + break + + case TYPE.UPDATE: + const rt = message.roundtripToken + if (!rt || (!rt.equals(this._token(peer, 0)) && !rt.equals(this._token(peer, 1)))) return + this._ctx.onrequest(UPDATE, message, peer) + break + } + } + + _finish (rid, err, val, peer) { + const req = this._requests[rid] + if (!req) return + + this._requests[rid] = undefined + const top = this.inflight[this.inflight.length - 1] + this.inflight[top.index = req.index] = top + this.inflight.pop() + + const type = req.message.type === TYPE.QUERY + ? QUERY + : UPDATE + + req.callback(err, val, peer, req.message, req.peer, type) + + while (this._pending.length && this.inflight.length < this._ctx.concurrency) { + const { message, peer, callback } = this._pending.shift() + this._requestImmediately(message, peer, callback) + } + } + + _request (message, peer, callback) { + // Should we wait to send? + if (this._pending.length || (this.inflight.length >= this._ctx.concurrency)) { + this._pending.push({ message, peer, callback }) + } else { + this._requestImmediately(message, peer, callback) + } + } + + _requestImmediately (message, peer, callback) { + const rid = message.rid = this._free() + const buffer = Message.encode(message) + + const req = { + rid, + index: this.inflight.length, + callback, + message, + buffer, + peer, + timeout: 4, + tries: 0 + } + + this._requests[rid] = req + this.inflight.push(req) + this.send(buffer, peer) + + // if sending a holepunch cmd, forward it right away + if (message.command === '_holepunch') this._holepunch(req) + } + + _cancel (rid, err) { + this._finish(rid, err || ECANCELLED, null, null) + } + + _onrotate () { + this._secrets[1] = this._secrets[0] + this._secrets[0] = randomBytes(32) + } + + _ontick () { + for (var i = this.inflight.length - 1; i >= 0; i--) { + const req = this.inflight[i] + + if (req.timeout === 2 && ++req.tries < TRIES) { + this._retry(req) + continue + } + + if (--req.timeout) { + continue + } + + this._cancel(req.rid, ETIMEDOUT) + } + } + + send (buffer, peer) { + this.socket.send(buffer, 0, buffer.length, peer.port, peer.host) + } + + destroy () { + clearInterval(this._rotateInterval) + clearInterval(this._tickInterval) + + this.socket.close() + + const pending = this._pending + this._pending = [] + + for (const req of pending) req.callback(ECANCELLED) + for (const req of this.inflight) this._cancel(req.rid) + } + + response (request, value, closerNodes, peer) { + const message = { + type: TYPE.RESPONSE, + rid: request.rid, + id: this.id, + closerNodes, + roundtripToken: this._token(peer, 0), + value + } + + this.send(Message.encode(message), peer) + } + + error (request, error, closerNodes, peer) { + const message = { + type: TYPE.RESPONSE, + rid: request.rid, + id: this.id, + closerNodes, + error: error.message + } + + this.send(Message.encode(message), peer) + } + + query (command, target, value, peer, callback) { + if (!callback) callback = noop + + this._request({ + type: TYPE.QUERY, + rid: 0, + id: this.id, + target, + command, + value + }, peer, callback) + } + + queryImmediately (command, target, value, peer, callback) { + if (!callback) callback = noop + + this._requestImmediately({ + type: TYPE.QUERY, + rid: 0, + id: this.id, + target, + command, + value + }, peer, callback) + } + + update (command, target, value, peer, callback) { + if (!callback) callback = noop + + this._request({ + type: TYPE.UPDATE, + rid: 0, + id: this.id, + roundtripToken: peer.roundtripToken, + target, + command, + value + }, peer, callback) + } +} + +IO.QUERY = QUERY +IO.UPDATE = UPDATE + +module.exports = IO + +function noop () {} + +function decodeMessage (buf) { + try { + return Message.decode(buf) + } catch (err) { + return null + } +} + +function randomBytes (n) { + const buf = Buffer.allocUnsafe(32) + sodium.randombytes_buf(buf) + return buf +} diff --git a/messages.js b/lib/messages.js similarity index 51% rename from messages.js rename to lib/messages.js index fb57758..04154e1 100644 --- a/messages.js +++ b/lib/messages.js @@ -1,4 +1,4 @@ -// This file is auto generated by the protocol-buffers cli tool +// This file is auto generated by the protocol-buffers compiler /* eslint-disable quotes */ /* eslint-disable indent */ @@ -10,60 +10,46 @@ var encodings = require('protocol-buffers-encodings') var varint = encodings.varint var skip = encodings.skip -var Request = exports.Request = { +exports.TYPE = { + "QUERY": 1, + "UPDATE": 2, + "RESPONSE": 3 +} + +var Holepunch = exports.Holepunch = { buffer: true, encodingLength: null, encode: null, decode: null } -var Response = exports.Response = { +var Message = exports.Message = { buffer: true, encodingLength: null, encode: null, decode: null } -defineRequest() -defineResponse() +defineHolepunch() +defineMessage() -function defineRequest () { +function defineHolepunch () { var enc = [ - encodings.string, encodings.bytes ] - Request.encodingLength = encodingLength - Request.encode = encode - Request.decode = decode + Holepunch.encodingLength = encodingLength + Holepunch.encode = encode + Holepunch.decode = decode function encodingLength (obj) { var length = 0 - if (!defined(obj.command)) throw new Error("command is required") - var len = enc[0].encodingLength(obj.command) - length += 1 + len - if (defined(obj.id)) { - var len = enc[1].encodingLength(obj.id) - length += 1 + len - } - if (defined(obj.target)) { - var len = enc[1].encodingLength(obj.target) + if (defined(obj.from)) { + var len = enc[0].encodingLength(obj.from) length += 1 + len } - if (defined(obj.forwardRequest)) { - var len = enc[1].encodingLength(obj.forwardRequest) - length += 1 + len - } - if (defined(obj.forwardResponse)) { - var len = enc[1].encodingLength(obj.forwardResponse) - length += 1 + len - } - if (defined(obj.roundtripToken)) { - var len = enc[1].encodingLength(obj.roundtripToken) - length += 1 + len - } - if (defined(obj.value)) { - var len = enc[1].encodingLength(obj.value) + if (defined(obj.to)) { + var len = enc[0].encodingLength(obj.to) length += 1 + len } return length @@ -73,39 +59,15 @@ function defineRequest () { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) var oldOffset = offset - if (!defined(obj.command)) throw new Error("command is required") - buf[offset++] = 10 - enc[0].encode(obj.command, buf, offset) - offset += enc[0].encode.bytes - if (defined(obj.id)) { + if (defined(obj.from)) { buf[offset++] = 18 - enc[1].encode(obj.id, buf, offset) - offset += enc[1].encode.bytes + enc[0].encode(obj.from, buf, offset) + offset += enc[0].encode.bytes } - if (defined(obj.target)) { + if (defined(obj.to)) { buf[offset++] = 26 - enc[1].encode(obj.target, buf, offset) - offset += enc[1].encode.bytes - } - if (defined(obj.forwardRequest)) { - buf[offset++] = 34 - enc[1].encode(obj.forwardRequest, buf, offset) - offset += enc[1].encode.bytes - } - if (defined(obj.forwardResponse)) { - buf[offset++] = 42 - enc[1].encode(obj.forwardResponse, buf, offset) - offset += enc[1].encode.bytes - } - if (defined(obj.roundtripToken)) { - buf[offset++] = 50 - enc[1].encode(obj.roundtripToken, buf, offset) - offset += enc[1].encode.bytes - } - if (defined(obj.value)) { - buf[offset++] = 58 - enc[1].encode(obj.value, buf, offset) - offset += enc[1].encode.bytes + enc[0].encode(obj.to, buf, offset) + offset += enc[0].encode.bytes } encode.bytes = offset - oldOffset return buf @@ -117,18 +79,11 @@ function defineRequest () { if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") var oldOffset = offset var obj = { - command: "", - id: null, - target: null, - forwardRequest: null, - forwardResponse: null, - roundtripToken: null, - value: null + from: null, + to: null } - var found0 = false while (true) { if (end <= offset) { - if (!found0) throw new Error("Decoded message is not valid") decode.bytes = offset - oldOffset return obj } @@ -136,34 +91,13 @@ function defineRequest () { offset += varint.decode.bytes var tag = prefix >> 3 switch (tag) { - case 1: - obj.command = enc[0].decode(buf, offset) - offset += enc[0].decode.bytes - found0 = true - break case 2: - obj.id = enc[1].decode(buf, offset) - offset += enc[1].decode.bytes + obj.from = enc[0].decode(buf, offset) + offset += enc[0].decode.bytes break case 3: - obj.target = enc[1].decode(buf, offset) - offset += enc[1].decode.bytes - break - case 4: - obj.forwardRequest = enc[1].decode(buf, offset) - offset += enc[1].decode.bytes - break - case 5: - obj.forwardResponse = enc[1].decode(buf, offset) - offset += enc[1].decode.bytes - break - case 6: - obj.roundtripToken = enc[1].decode(buf, offset) - offset += enc[1].decode.bytes - break - case 7: - obj.value = enc[1].decode(buf, offset) - offset += enc[1].decode.bytes + obj.to = enc[0].decode(buf, offset) + offset += enc[0].decode.bytes break default: offset = skip(prefix & 7, buf, offset) @@ -172,31 +106,52 @@ function defineRequest () { } } -function defineResponse () { +function defineMessage () { var enc = [ - encodings.bytes + encodings.enum, + encodings.varint, + encodings.bytes, + encodings.string ] - Response.encodingLength = encodingLength - Response.encode = encode - Response.decode = decode + Message.encodingLength = encodingLength + Message.encode = encode + Message.decode = decode function encodingLength (obj) { var length = 0 + if (!defined(obj.type)) throw new Error("type is required") + var len = enc[0].encodingLength(obj.type) + length += 1 + len + if (!defined(obj.rid)) throw new Error("rid is required") + var len = enc[1].encodingLength(obj.rid) + length += 1 + len if (defined(obj.id)) { - var len = enc[0].encodingLength(obj.id) + var len = enc[2].encodingLength(obj.id) length += 1 + len } - if (defined(obj.nodes)) { - var len = enc[0].encodingLength(obj.nodes) + if (defined(obj.target)) { + var len = enc[2].encodingLength(obj.target) length += 1 + len } - if (defined(obj.value)) { - var len = enc[0].encodingLength(obj.value) + if (defined(obj.closerNodes)) { + var len = enc[2].encodingLength(obj.closerNodes) length += 1 + len } if (defined(obj.roundtripToken)) { - var len = enc[0].encodingLength(obj.roundtripToken) + var len = enc[2].encodingLength(obj.roundtripToken) + length += 1 + len + } + if (defined(obj.command)) { + var len = enc[3].encodingLength(obj.command) + length += 1 + len + } + if (defined(obj.error)) { + var len = enc[3].encodingLength(obj.error) + length += 1 + len + } + if (defined(obj.value)) { + var len = enc[2].encodingLength(obj.value) length += 1 + len } return length @@ -206,25 +161,48 @@ function defineResponse () { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) var oldOffset = offset + if (!defined(obj.type)) throw new Error("type is required") + buf[offset++] = 8 + enc[0].encode(obj.type, buf, offset) + offset += enc[0].encode.bytes + if (!defined(obj.rid)) throw new Error("rid is required") + buf[offset++] = 16 + enc[1].encode(obj.rid, buf, offset) + offset += enc[1].encode.bytes if (defined(obj.id)) { - buf[offset++] = 10 - enc[0].encode(obj.id, buf, offset) - offset += enc[0].encode.bytes + buf[offset++] = 26 + enc[2].encode(obj.id, buf, offset) + offset += enc[2].encode.bytes } - if (defined(obj.nodes)) { - buf[offset++] = 18 - enc[0].encode(obj.nodes, buf, offset) - offset += enc[0].encode.bytes + if (defined(obj.target)) { + buf[offset++] = 34 + enc[2].encode(obj.target, buf, offset) + offset += enc[2].encode.bytes } - if (defined(obj.value)) { - buf[offset++] = 26 - enc[0].encode(obj.value, buf, offset) - offset += enc[0].encode.bytes + if (defined(obj.closerNodes)) { + buf[offset++] = 42 + enc[2].encode(obj.closerNodes, buf, offset) + offset += enc[2].encode.bytes } if (defined(obj.roundtripToken)) { - buf[offset++] = 34 - enc[0].encode(obj.roundtripToken, buf, offset) - offset += enc[0].encode.bytes + buf[offset++] = 50 + enc[2].encode(obj.roundtripToken, buf, offset) + offset += enc[2].encode.bytes + } + if (defined(obj.command)) { + buf[offset++] = 58 + enc[3].encode(obj.command, buf, offset) + offset += enc[3].encode.bytes + } + if (defined(obj.error)) { + buf[offset++] = 66 + enc[3].encode(obj.error, buf, offset) + offset += enc[3].encode.bytes + } + if (defined(obj.value)) { + buf[offset++] = 74 + enc[2].encode(obj.value, buf, offset) + offset += enc[2].encode.bytes } encode.bytes = offset - oldOffset return buf @@ -236,13 +214,21 @@ function defineResponse () { if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") var oldOffset = offset var obj = { + type: 1, + rid: 0, id: null, - nodes: null, - value: null, - roundtripToken: null + target: null, + closerNodes: null, + roundtripToken: null, + command: "", + error: "", + value: null } + var found0 = false + var found1 = false while (true) { if (end <= offset) { + if (!found0 || !found1) throw new Error("Decoded message is not valid") decode.bytes = offset - oldOffset return obj } @@ -251,20 +237,42 @@ function defineResponse () { var tag = prefix >> 3 switch (tag) { case 1: - obj.id = enc[0].decode(buf, offset) + obj.type = enc[0].decode(buf, offset) offset += enc[0].decode.bytes + found0 = true break case 2: - obj.nodes = enc[0].decode(buf, offset) - offset += enc[0].decode.bytes + obj.rid = enc[1].decode(buf, offset) + offset += enc[1].decode.bytes + found1 = true break case 3: - obj.value = enc[0].decode(buf, offset) - offset += enc[0].decode.bytes + obj.id = enc[2].decode(buf, offset) + offset += enc[2].decode.bytes break case 4: - obj.roundtripToken = enc[0].decode(buf, offset) - offset += enc[0].decode.bytes + obj.target = enc[2].decode(buf, offset) + offset += enc[2].decode.bytes + break + case 5: + obj.closerNodes = enc[2].decode(buf, offset) + offset += enc[2].decode.bytes + break + case 6: + obj.roundtripToken = enc[2].decode(buf, offset) + offset += enc[2].decode.bytes + break + case 7: + obj.command = enc[3].decode(buf, offset) + offset += enc[3].decode.bytes + break + case 8: + obj.error = enc[3].decode(buf, offset) + offset += enc[3].decode.bytes + break + case 9: + obj.value = enc[2].decode(buf, offset) + offset += enc[2].decode.bytes break default: offset = skip(prefix & 7, buf, offset) diff --git a/lib/query-stream.js b/lib/query-stream.js new file mode 100644 index 0000000..ab85879 --- /dev/null +++ b/lib/query-stream.js @@ -0,0 +1,264 @@ +const { Readable } = require('stream') +const nodes = require('ipv4-peers').idLength(32) +const QueryTable = require('./query-table') + +const BOOTSTRAPPING = Symbol('BOOTSTRAPPING') +const MOVING_CLOSER = Symbol('MOVING_CLOSER') +const UPDATING = Symbol('UPDATING') +const FINALIZED = Symbol('FINALIZED') + +class QueryStream extends Readable { + constructor (node, command, target, value, opts) { + if (!opts) opts = {} + if (!opts.concurrency) opts.concurrency = opts.highWaterMark || node.concurrency + + super({ + objectMode: true, + highWaterMark: opts.concurrency + }) + + const cmd = node._commands.get(command) + + this.command = command + this.target = target + this.value = cmd ? cmd.inputEncoding.encode(value) : (value || null) + this.update = !!opts.update + this.query = !!opts.query || !opts.update + this.destroyed = false + this.inflight = 0 + this.responses = 0 + this.updates = 0 + this.table = opts.table || new QueryTable(node.id, target) + + node.inflightQueries++ + + this._status = opts.table ? MOVING_CLOSER : BOOTSTRAPPING + this._node = node + this._concurrency = opts.concurrency + this._callback = this._onresponse.bind(this) + this._map = identity + this._outputEncoding = cmd ? cmd.outputEncoding : null + } + + map (fn) { + this._map = fn + return this + } + + _onresponse (err, message, peer, request, to, type) { + this.inflight-- + + if (err && to.id) { + // Request, including retries, failed completely + // Remove the "to" node. + const node = this._node.bucket.get(to.id) + if (node) this._node._removeNode(node) + } + + if (this._status === FINALIZED) { + if (!this.inflight) { + if (this.destroyed) this.emit('close') + else this.destroy() + } + return + } + + if (err) { + this.emit('warning', err) + this._readMaybe() + return + } + + this.responses++ + this.table.addVerified(message, peer) + + if (this._status === MOVING_CLOSER) { + const candidates = decodeNodes(message.closerNodes) + for (var i = 0; i < candidates.length; i++) { + this.table.addUnverified(candidates[i], peer) + } + } else if (this._status === UPDATING) { + this.updates++ + } + + if (message.error) { + this.emit('warning', new Error(message.error)) + this._readMaybe() + return + } + + if (!this.query && this._status === MOVING_CLOSER) { + this._readMaybe() + return + } + + const value = this._outputEncoding + ? this._decodeOutput(message.value) + : message.value + + const data = this._map({ + type, + node: { + id: message.id, + port: peer.port, + host: peer.host + }, + value + }) + + if (!data) { + this._readMaybe() + return + } + + this.push(data) + } + + _decodeOutput (val) { + try { + return val && this._outputEncoding.decode(val) + } catch (err) { + return null + } + } + + _bootstrap () { + const table = this.table + const bootstrap = this._node.bucket.closest(table.target, table.k) + var i = 0 + + for (; i < bootstrap.length; i++) { + const b = bootstrap[i] + const node = { id: b.id, port: b.port, host: b.host } + table.addUnverified(node, null) + } + + const bootstrapNodes = this._node.bootstrapNodes + if (bootstrap.length < bootstrapNodes.length) { + for (i = 0; i < bootstrapNodes.length; i++) { + this._send(bootstrapNodes[i], true, false) + } + } + + this._status = MOVING_CLOSER + this._moveCloser() + } + + _sendAll (nodes, force, sendToken) { + var free = Math.max(0, this._concurrency - this._node._io.inflight.length) + var sent = 0 + + if (!free && !this.inflight) free = 1 + if (!free) return 0 + + for (var i = 0; i < nodes.length; i++) { + if (this._send(nodes[i], force, sendToken)) { + if (++sent === free) break + } + } + + return sent + } + + _send (node, force, isUpdate) { + if (!force) { + if (node.queried) return false + node.queried = true + } + + this.inflight++ + const io = this._node._io + + if (isUpdate) { + if (!node.roundtripToken) return this._callback(new Error('Roundtrip token is required')) + io.update(this.command, this.target, this.value, node, this._callback) + } else if (this.query) { + io.query(this.command, this.target, this.value, node, this._callback) + } else { + io.query('_find_node', this.target, null, node, this._callback) + } + + return true + } + + _sendUpdate () { + const sent = this._sendAll(this.table.closest, false, true) + if (sent || this.inflight) return + + this._finalize() + } + + _moveCloser () { + const table = this.table + const sent = this._sendAll(table.unverified, false, false) + if (sent || this.inflight) return + + if (this.update) { + for (var i = 0; i < table.closest.length; i++) { + table.closest[i].queried = false + } + this._status = UPDATING + this._sendUpdate() + } else { + this._finalize() + } + } + + _finalize () { + const status = this._status + if (status === FINALIZED) return + + this._status = FINALIZED + this._node.inflightQueries-- + + if (!this.responses && !this.destroyed) { + this.destroy(new Error('No nodes responded')) + } + if (status === UPDATING && !this.updates && !this.destroyed) { + this.destroy(new Error('No close nodes responded')) + } + + this.push(null) + } + + _readMaybe () { + if (!this.inflight || this._readableState.flowing === true) this._read() + } + + _read () { + if (this._node.destroyed) return + + switch (this._status) { + case BOOTSTRAPPING: return this._bootstrap() + case MOVING_CLOSER: return this._moveCloser() + case UPDATING: return this._sendUpdate() + case FINALIZED: return + } + + throw new Error('Unknown status: ' + this._status) + } + + destroy (err) { + if (this.destroyed) return + this.destroyed = true + + if (err) this.emit('error', err) + this._finalize() + if (!this.inflight) this.emit('close') + } +} + +module.exports = QueryStream + +function decodeNodes (buf) { + if (!buf) return [] + try { + return nodes.decode(buf) + } catch (err) { + return [] + } +} + +function identity (a) { + return a +} diff --git a/lib/query-table.js b/lib/query-table.js new file mode 100644 index 0000000..5748d88 --- /dev/null +++ b/lib/query-table.js @@ -0,0 +1,69 @@ +const xor = require('xor-distance') + +class QueryTable { + constructor (id, target) { + this.k = 20 + this.id = id + this.target = target + this.closest = [] + this.unverified = [] + } + + addUnverified (node, referrer) { + if (node.id.equals(this.id)) return + + node.distance = xor(this.target, node.id) + node.referrer = referrer + + insertSorted(node, this.k, this.unverified) + } + + addVerified (message, peer) { + if (!message.id || !message.roundtripToken || message.id.equals(this.id)) { + return + } + + var prev = getNode(message.id, this.unverified) + + if (!prev) { + prev = { + id: message.id, + host: peer.host, + port: peer.port, + distance: xor(message.id, this.target) + } + } + + prev.roundtripToken = message.roundtripToken + insertSorted(prev, this.k, this.closest) + } +} + +module.exports = QueryTable + +function getNode (id, list) { + // find id in the list. + // technically this would be faster with binary search (against distance) + // but this list is always small, so meh + + for (var i = 0; i < list.length; i++) { + if (list[i].id.equals(id)) return list[i] + } + + return null +} + +function insertSorted (node, max, list) { + if (list.length === max && !xor.lt(node.distance, list[max - 1].distance)) return + if (getNode(node.id, list)) return + + if (list.length < max) list.push(node) + else list[max - 1] = node + + var pos = list.length - 1 + while (pos && xor.gt(list[pos - 1].distance, node.distance)) { + list[pos] = list[pos - 1] + list[pos - 1] = node + pos-- + } +} diff --git a/package.json b/package.json index 9e3206f..0307611 100644 --- a/package.json +++ b/package.json @@ -1,38 +1,35 @@ { "name": "dht-rpc", "version": "3.0.1", - "description": "Make RPC calls over a Kademlia based DHT.", + "description": "Make RPC calls over a Kademlia based DHT", "main": "index.js", - "dependencies": { - "duplexify": "^3.5.0", - "inherits": "^2.0.3", - "ipv4-peers": "^1.1.1", - "k-bucket": "^5.0.0", - "protocol-buffers-encodings": "^1.1.0", - "readable-stream": "^2.1.5", - "sodium-universal": "^2.0.0", - "stream-collector": "^1.0.1", - "time-ordered-set": "^1.0.1", - "udp-request": "^1.3.0", - "xor-distance": "^1.0.0" - }, - "devDependencies": { - "protocol-buffers": "^3.2.1", - "standard": "^11.0.1", - "tape": "^4.9.0" + "scripts": { + "test": "standard && tape test.js", + "protobuf": "protocol-buffers schema.proto -o lib/messages.js" }, "repository": { "type": "git", - "url": "https://github.com/mafintosh/dht-rpc.git" - }, - "scripts": { - "test": "standard && tape test.js", - "protobuf": "protocol-buffers schema.proto -o messages.js" + "url": "git+https://github.com/mafintosh/dht-rpc.git" }, "author": "Mathias Buus (@mafintosh)", "license": "MIT", "bugs": { "url": "https://github.com/mafintosh/dht-rpc/issues" }, - "homepage": "https://github.com/mafintosh/dht-rpc" + "homepage": "https://github.com/mafintosh/dht-rpc#readme", + "devDependencies": { + "protocol-buffers": "^4.1.0", + "standard": "^12.0.1", + "tape": "^4.9.1" + }, + "dependencies": { + "codecs": "^1.2.1", + "ipv4-peers": "^1.1.1", + "k-bucket": "^5.0.0", + "protocol-buffers-encodings": "^1.1.0", + "sodium-universal": "^2.0.0", + "stream-collector": "^1.0.1", + "time-ordered-set": "^1.0.1", + "xor-distance": "^1.0.0" + } } diff --git a/query-stream.js b/query-stream.js deleted file mode 100644 index e79cc2f..0000000 --- a/query-stream.js +++ /dev/null @@ -1,304 +0,0 @@ -var stream = require('readable-stream') -var inherits = require('inherits') -var nodes = require('ipv4-peers').idLength(32) -var xor = require('xor-distance') - -var BLANK = Buffer.from([ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 -]) - -module.exports = QueryStream - -function QueryStream (dht, query, opts) { - if (!(this instanceof QueryStream)) return new QueryStream(dht, query, opts) - if (!opts) opts = {} - if (!opts.concurrency) opts.concurrency = opts.highWaterMark || dht.concurrency - if (!query.target) throw new Error('query.target is required') - - stream.Readable.call(this, {objectMode: true, highWaterMark: opts.concurrency}) - - var self = this - var nodes = opts.node || opts.nodes - - this.query = query - this.query.id = dht._queryId - this.target = query.target - this.token = !!opts.token - this.holepunching = opts.holepunching !== false - this.commits = 0 - this.responses = 0 - this.errors = 0 - this.destroyed = false - this.verbose = !!opts.verbose - - dht.inflightQueries++ - - this._dht = dht - this._committing = false - this._finalized = false - this._closest = opts.closest || [] - this._concurrency = opts.concurrency - this._updating = false - this._pending = nodes ? [].concat(nodes).map(copyNode) : [] - this._k = nodes ? Infinity : opts.k || 20 - this._inflight = 0 - this._moveCloser = !nodes - this._map = opts.map || echo - this._bootstrapped = !this._moveCloser - this._onresponse = onresponse - this._onresponseholepunch = onresponseholepunch - - function onresponseholepunch (err, res, peer, query) { - if (!err || !peer || !peer.referrer) self._callback(err, res, peer) - else self._holepunch(peer, query) - } - - function onresponse (err, res, peer) { - self._callback(err, res, peer) - } -} - -inherits(QueryStream, stream.Readable) - -QueryStream.prototype.destroy = function (err) { - if (this.destroyed) return - this.destroyed = true - this._finalize() - if (err) this.emit('error', err) - this.emit('close') -} - -QueryStream.prototype._finalize = function () { - if (this._finalized) return - this._finalized = true - this._dht.inflightQueries-- - if (!this.responses && !this.destroyed) this.destroy(new Error('No nodes responded')) - if (!this.commits && this._committing && !this.destroyed) this.destroy(new Error('No close nodes responded')) - this.push(null) -} - -QueryStream.prototype._bootstrap = function () { - this._bootstrapped = true - - var bootstrap = this._dht.bucket.closest(this.target, this._k) - var i = 0 - - for (i = 0; i < bootstrap.length; i++) { - var b = bootstrap[i] - this._addPending({id: b.id, port: b.port, host: b.host}, null) - } - - if (bootstrap.length < this._dht._bootstrap.length) { - for (i = 0; i < this._dht._bootstrap.length; i++) { - this._send(this._dht._bootstrap[i], true, false) - } - } -} - -QueryStream.prototype._readMaybe = function () { - if (this._readableState.flowing === true) this._read() -} - -QueryStream.prototype._sendTokens = function () { - if (this.destroyed) return - - var sent = this._sendAll(this._closest, false, true) - if (sent || this._inflight) return - - this._finalize() -} - -QueryStream.prototype._sendPending = function () { - if (this.destroyed) return - if (!this._bootstrapped) this._bootstrap() - - var sent = this._sendAll(this._pending, false, false) - if (sent || this._inflight) return - - if (this.token) { - for (var i = 0; i < this._closest.length; i++) this._closest[i].queried = false - this._committing = true - this._sendTokens() - } else { - this._finalize() - } -} - -QueryStream.prototype._read = function () { - if (this._committing) this._sendTokens() - else this._sendPending() -} - -QueryStream.prototype._holepunch = function (peer, query) { - var self = this - - this._dht._holepunch(peer, peer.referrer, function (err) { - if (err) return self._callback(err, null, peer) - self._dht._request(query, peer, false, self._onresponse) - }) -} - -QueryStream.prototype._callback = function (err, res, peer) { - this._inflight-- - if (this.destroyed) return - - if (err) { - if (res && res.id) { - var node = this._dht.bucket.get(res.id) - if (node) this._dht._removeNode(node) - } - this.errors++ - this.emit('warning', err) - this._readMaybe() - return - } - - this.responses++ - if (this._committing) this.commits++ - this._addClosest(res, peer) - - if (this._moveCloser) { - var candidates = decodeNodes(res.nodes) - for (var i = 0; i < candidates.length; i++) this._addPending(candidates[i], peer) - } - - if (!validateId(res.id) || (this.token && !this.verbose && !this._committing)) { - this._readMaybe() - return - } - - var data = this._map({ - node: { - id: res.id, - port: peer.port, - host: peer.host - }, - value: res.value - }) - - if (!data) { - this._readMaybe() - return - } - - this.push(data) -} - -QueryStream.prototype._sendAll = function (nodes, force, useToken) { - var sent = 0 - var free = Math.max(0, this._concurrency - this._dht.socket.inflight) - - if (!free && !this._inflight) free = 1 - if (!free) return 0 - - for (var i = 0; i < nodes.length; i++) { - if (this._send(nodes[i], force, useToken)) { - if (++sent === free) break - } - } - - return sent -} - -QueryStream.prototype._send = function (node, force, useToken) { - if (!force) { - if (node.queried) return false - node.queried = true - } - - this._inflight++ - - var query = this.query - - if (useToken && node.roundtripToken) { - query = { - command: this.query.command, - id: this.query.id, - target: this.query.target, - value: this.query.value, - roundtripToken: node.roundtripToken - } - } - - this._dht._request(query, node, false, this.holepunching ? this._onresponseholepunch : this._onresponse) - return true -} - -QueryStream.prototype._addPending = function (node, ref) { - if (node.id.equals(this._dht.id)) return - node.distance = xor(this.target, node.id) - node.referrer = ref - insertSorted(node, this._k, this._pending) -} - -QueryStream.prototype._addClosest = function (res, peer) { - if (!res.id || !res.roundtripToken || res.id.equals(this._dht.id)) return - - var prev = getNode(res.id, this._pending) - - if (!prev) { - prev = { - id: res.id, - port: peer.port, - host: peer.host, - distance: xor(res.id, this.target) - } - } - - prev.roundtripToken = res.roundtripToken - insertSorted(prev, this._k, this._closest) -} - -function decodeNodes (buf) { - if (!buf) return [] - try { - return nodes.decode(buf) - } catch (err) { - return [] - } -} - -function getNode (id, list) { - // find id in the list. - // technically this would be faster with binary search (against distance) - // but this list is always small, so meh - for (var i = 0; i < list.length; i++) { - if (list[i].id.equals(id)) return list[i] - } - - return null -} - -function validateId (id) { - return id && id.length === 32 -} - -function insertSorted (node, max, list) { - if (list.length === max && !xor.lt(node.distance, list[max - 1].distance)) return - if (getNode(node.id, list)) return - - if (list.length < max) list.push(node) - else list[max - 1] = node - - var pos = list.length - 1 - while (pos && xor.gt(list[pos - 1].distance, node.distance)) { - list[pos] = list[pos - 1] - list[pos - 1] = node - pos-- - } -} - -function copyNode (node) { - return { - id: node.id || BLANK, - port: node.port, - host: node.host, - roundtripToken: node.roundtripToken, - referrer: node.referrer || node.referer - } -} - -function echo (a) { - return a -} diff --git a/schema.proto b/schema.proto index 38e299b..7c434e3 100644 --- a/schema.proto +++ b/schema.proto @@ -1,16 +1,27 @@ -message Request { - required string command = 1; - optional bytes id = 2; - optional bytes target = 3; - optional bytes forwardRequest = 4; - optional bytes forwardResponse = 5; - optional bytes roundtripToken = 6; - optional bytes value = 7; +message Holepunch { + optional bytes from = 2; + optional bytes to = 3; +} + +enum TYPE { + QUERY = 1; + UPDATE = 2; + RESPONSE = 3; } -message Response { - optional bytes id = 1; - optional bytes nodes = 2; - optional bytes value = 3; - optional bytes roundtripToken = 4; +message Message { + // request/response type + id + required TYPE type = 1; + required uint64 rid = 2; + + // kademlia stuff + optional bytes id = 3; + optional bytes target = 4; + optional bytes closerNodes = 5; + optional bytes roundtripToken = 6; + + // rpc stuff + optional string command = 7; + optional string error = 8; + optional bytes value = 9; } diff --git a/test.js b/test.js index 2dbc432..bf3f027 100644 --- a/test.js +++ b/test.js @@ -1,26 +1,25 @@ -var tape = require('tape') -var dht = require('./') -var blake2b = require('./blake2b') +const tape = require('tape') +const dht = require('./') +const blake2b = require('./lib/blake2b') tape('simple update', function (t) { bootstrap(function (port, node) { - var a = dht({bootstrap: port}) - var b = dht({bootstrap: port}) - - a.on('update:echo', function (data, callback) { - t.ok(data.roundtripToken, 'has roundtrip token') - t.same(data.value, Buffer.from('Hello, World!'), 'expected data') - callback(null, data.value) + const a = dht({ bootstrap: port }) + const b = dht({ bootstrap: port }) + + a.command('echo', { + query (data, callback) { + t.fail('should not query') + callback(new Error('nope')) + }, + update (data, callback) { + t.same(data.value, Buffer.from('Hello, World!'), 'expected data') + callback(null, data.value) + } }) a.ready(function () { - var data = { - command: 'echo', - target: a.id, - value: Buffer.from('Hello, World!') - } - - b.update(data, function (err, responses) { + b.update('echo', a.id, Buffer.from('Hello, World!'), function (err, responses) { a.destroy() b.destroy() node.destroy() @@ -36,21 +35,18 @@ tape('simple update', function (t) { tape('simple query', function (t) { bootstrap(function (port, node) { - var a = dht({bootstrap: port}) - var b = dht({bootstrap: port}) + const a = dht({ bootstrap: port }) + const b = dht({ bootstrap: port }) - a.on('query:hello', function (data, callback) { - t.same(data.value, null, 'expected data') - callback(null, Buffer.from('world')) + a.command('hello', { + query (data, callback) { + t.same(data.value, null, 'expected data') + callback(null, Buffer.from('world')) + } }) a.ready(function () { - var data = { - command: 'hello', - target: a.id - } - - b.query(data, function (err, responses) { + b.query('hello', a.id, function (err, responses) { a.destroy() b.destroy() node.destroy() @@ -64,91 +60,34 @@ tape('simple query', function (t) { }) }) -tape('targeted query', function (t) { - bootstrap(function (port, node) { - var a = dht({bootstrap: port}) - - a.on('query:echo', function (data, cb) { - t.pass('in echo') - cb(null, data.value) - }) - - var b = dht({bootstrap: port}) - - b.on('query:echo', function (data, cb) { - t.fail('should not hit me') - cb() - }) - - a.ready(function () { - b.ready(function () { - var client = dht({bootstrap: port}) - - client.query({ - command: 'echo', - value: Buffer.from('hi'), - target: client.id - }, { - node: { - port: a.address().port, - host: '127.0.0.1' - } - }, function (err, responses) { - client.destroy() - a.destroy() - b.destroy() - node.destroy() - - t.error(err, 'no error') - t.same(responses.length, 1, 'one response') - t.same(responses[0].value, Buffer.from('hi'), 'echoed') - t.end() - }) - }) - }) - }) -}) - -tape('targeted update', function (t) { +tape('query and update', function (t) { bootstrap(function (port, node) { - var a = dht({bootstrap: port}) - - a.on('update:echo', function (data, cb) { - t.pass('in echo') - cb(null, data.value) - }) - - var b = dht({bootstrap: port}) - - b.on('update:echo', function (data, cb) { - t.fail('should not hit me') - cb() + const a = dht({ bootstrap: port }) + const b = dht({ bootstrap: port }) + + a.command('hello', { + query (data, callback) { + t.same(data.value, null, 'expected query data') + callback(null, Buffer.from('world')) + }, + update (data, callback) { + t.same(data.value, null, 'expected update data') + callback(null, Buffer.from('world')) + } }) a.ready(function () { - b.ready(function () { - var client = dht({bootstrap: port}) - - client.update({ - command: 'echo', - value: Buffer.from('hi'), - target: client.id - }, { - node: { - port: a.address().port, - host: '127.0.0.1' - } - }, function (err, responses) { - client.destroy() - a.destroy() - b.destroy() - node.destroy() + b.queryAndUpdate('hello', a.id, function (err, responses) { + a.destroy() + b.destroy() + node.destroy() - t.error(err, 'no error') - t.same(responses.length, 1, 'one response') - t.same(responses[0].value, Buffer.from('hi'), 'echoed') - t.end() - }) + t.error(err, 'no errors') + t.same(responses.length, 2, 'two responses') + t.same(responses[0].value, Buffer.from('world'), 'responded') + t.same(responses[1].value, Buffer.from('world'), 'responded') + t.ok(responses[0].type !== responses[1].type, 'not the same type') + t.end() }) }) }) @@ -156,22 +95,23 @@ tape('targeted update', function (t) { tape('swarm query', function (t) { bootstrap(function (port, node) { - var swarm = [] + const swarm = [] var closest = 0 loop() function done () { t.pass('created swarm') - var key = blake2b(Buffer.from('hello')) - var me = dht({bootstrap: port}) - me.update({command: 'kv', target: key, value: Buffer.from('hello')}, function (err, responses) { + const key = blake2b(Buffer.from('hello')) + const me = dht({ bootstrap: port }) + + me.update('kv', key, Buffer.from('hello'), function (err, responses) { t.error(err, 'no error') t.same(closest, 20, '20 closest nodes') t.same(responses.length, 20, '20 responses') - var stream = me.query({command: 'kv', target: key}) + const stream = me.query('kv', key) stream.on('data', function (data) { if (data.value) { @@ -190,18 +130,20 @@ tape('swarm query', function (t) { function loop () { if (swarm.length === 256) return done() - var node = dht({bootstrap: port}) + const node = dht({ bootstrap: port }) swarm.push(node) var value = null - node.on('update:kv', function (data, cb) { - closest++ - value = data.value - cb() - }) - node.on('query:kv', function (data, cb) { - cb(null, value) + node.command('kv', { + update (data, cb) { + closest++ + value = data.value + cb() + }, + query (data, cb) { + cb(null, value) + } }) node.ready(loop) @@ -209,12 +151,73 @@ tape('swarm query', function (t) { }) }) +tape('holepunch api', function (t) { + bootstrap(function (port, node) { + const a = dht({ bootstrap: port }) + const b = dht({ bootstrap: port }) + var holepunched = false + + a.ready(function () { + b.ready(function () { + node.on('holepunch', function (from, to) { + t.same(from.port, a.address().port) + t.same(to.port, b.address().port) + holepunched = true + }) + a.holepunch({ + host: '127.0.0.1', + port: b.address().port, + referrer: { + host: '127.0.0.1', + port: node.address().port + } + }, function (err) { + t.error(err, 'no error') + t.ok(holepunched) + t.end() + + node.destroy() + a.destroy() + b.destroy() + }) + }) + }) + }) +}) + +tape('timeouts', function (t) { + bootstrap(function (port, node) { + const a = dht({ bootstrap: port, ephemeral: true }) + const b = dht({ bootstrap: port }) + + var tries = 0 + + b.command('nope', { + update (query, cb) { + tries++ + t.pass('ignoring update') + } + }) + + b.ready(function () { + a.update('nope', Buffer.alloc(32), function (err) { + t.ok(err, 'errored') + t.same(tries, 3) + t.end() + node.destroy() + a.destroy() + b.destroy() + }) + }) + }) +}) + function bootstrap (done) { - var node = dht({ + const node = dht({ ephemeral: true }) - node.listen(function () { + node.listen(0, function () { done(node.address().port, node) }) }