Browse Source

Next version (#2)

* rebase on top old history

* fix dep

* more missing deps

* updated docs

* impl readme api

* final tweaks

* more tests

* revert version

* remove dead code
v4
Mathias Buus 6 years ago
committed by GitHub
parent
commit
3a05e9c03e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 199
      README.md
  3. 94
      example.js
  4. 626
      index.js
  5. 0
      lib/blake2b.js
  6. 299
      lib/io.js
  7. 272
      lib/messages.js
  8. 264
      lib/query-stream.js
  9. 69
      lib/query-table.js
  10. 45
      package.json
  11. 304
      query-stream.js
  12. 37
      schema.proto
  13. 249
      test.js

1
.gitignore

@ -1 +1,2 @@
node_modules
sandbox.js

199
README.md

@ -5,8 +5,15 @@ Make RPC calls over a [Kademlia](https://pdos.csail.mit.edu/~petar/papers/maymou
```
npm install dht-rpc
```
[![build status](http://img.shields.io/travis/mafintosh/dht-rpc.svg?style=flat)](http://travis-ci.org/mafintosh/dht-rpc)
## Key Features
* UDP hole punching support
* Easily add any command to your DHT
* Streaming queries and updates
## Usage
Here is an example implementing a simple key value store
@ -14,10 +21,10 @@ Here is an example implementing a simple key value store
First spin up a bootstrap node. You can make multiple if you want for redundancy.
``` js
var dht = require('dht-rpc')
const dht = require('dht-rpc')
// Set ephemeral: true so other peers do not add us to the peer list, simply bootstrap
var bootstrap = dht({ephemeral: true})
const bootstrap = dht({ ephemeral: true })
bootstrap.listen(10001)
```
@ -25,39 +32,42 @@ bootstrap.listen(10001)
Now lets make some dht nodes that can store values in our key value store.
``` js
var dht = require('dht-rpc')
var crypto = require('crypto')
const dht = require('dht-rpc')
const crypto = require('crypto')
// Let's create 100 dht nodes for our example.
for (var i = 0; i < 100; i++) createNode()
function createNode () {
var node = dht({
bootstrap: ['localhost:10001']
})
var values = {}
// When we are the closest node and someone is sending us a "store" command
node.on('update:values', function (query, cb) {
if (!query.value) return cb()
// Use the hash of the value as the key
var key = sha256(query.value).toString('hex')
values[key] = query.value
console.log('Storing', key, '-->', query.value.toString())
cb()
const node = dht({
bootstrap: [
'localhost:10001'
]
})
// When someone is querying for a "lookup" command
node.on('query:values', function (query, cb) {
var value = values[query.target.toString('hex')]
cb(null, value)
const values = new Map()
node.command('values', {
// When we are the closest node and someone is sending us a "store" command
update (query, cb) {
if (!query.value) return cb()
// Use the hash of the value as the key
const key = sha256(query.value).toString('hex')
values.set(key, query.value)
console.log('Storing', key, '-->', query.value.toString())
cb()
},
// When someone is querying for a "lookup" command
query (query, cb) {
const value = values[query.target.toString('hex')]
cb(null, value)
}
})
}
function sha256 (val) {
return crypto.createHash('sha256').update(val).digest('hex')
return crypto.createHash('sha256').update(val).digest()
}
```
@ -65,9 +75,9 @@ To insert a value into this dht make another script that does this following
``` js
// Set ephemeral: true as we are not part of the network.
var node = dht({ephemeral: true})
const node = dht({ ephemeral: true })
node.update({command: 'values', target: sha256(val), value: val}, function (err, res) {
node.update('values', sha256(val), value, function (err, res) {
if (err) throw err
console.log('Inserted', sha256(val).toString('hex'))
})
@ -76,7 +86,7 @@ node.update({command: 'values', target: sha256(val), value: val}, function (err,
Then after inserting run this script to query for a value
``` js
node.query({command: 'values', target: new Buffer(hexFromAbove, 'hex')})
node.query('values', Buffer.from(hexFromAbove, 'hex'))
.on('data', function (data) {
if (data.value && sha256(data.value).toString('hex') === hexFromAbove) {
// We found the value! Destroy the query stream as there is no need to continue.
@ -91,74 +101,135 @@ node.query({command: 'values', target: new Buffer(hexFromAbove, 'hex')})
## API
#### `var node = dht([options])`
#### `const node = dht([options])`
Create a new DHT node. Options include
Create a new DHT node.
``` js
Options include:
```js
{
id: nodeId, // id of the node
ephemeral: false, // will this node answer queries?
bootstrap: ['host:port'], // bootstrap nodes
socket: udpSocket // optional udp socket
// Whether or not this node is ephemeral or should join the routing table
ephemeral: false,
// A list of bootstrap nodes
bootstrap: [ 'bootstrap-node.com:24242', ... ],
// Optionally pass in your own UDP socket to use.
socket: udpSocket
}
```
#### `var stream = node.query(query, [options], [callback])`
#### `node.command(name, cmd)`
Create a new query. Query should look like this
Define a new RPC command. `cmd` should look like this
``` js
```js
{
command: 'command-to-run',
target: new Buffer('32 byte target'),
value: new Buffer('some payload')
// Query handler
query (query, cb),
// Update handler. only triggered when we are one of the closest nodes to the target
update (query, cb),
// Optional value encoding for the query/update incoming value. Defaults to binary.
inputEncoding: 'json', 'utf-8', object,
// Optional value encoding for the query/update outgoing value. Defaults to binary.
outputEncoding: (same as above),
valueEncoding: (sets both input/output encoding to this)
}
```
And options include
The `query` object in the query/update function looks like this:
``` js
```js
{
// always the same as your command def
command: 'command-name',
// the node who sent the query/update
node: { port, host, id },
// the query/update target (32 byte target)
target: Buffer,
// the query/update payload decoded with the inputEncoding
value
}
```
You should call the query/update callback with `(err, value)` where
value will be encoded using the outputEncoding and returned to the node.
#### `const stream = node.query(name, target, [value], [callback])`
Send a query command.
If you set a valueEncoding when defining the command the value will be encoded.
Returns a result stream that emits data that looks like this:
```js
{
nodes: [{host: 'example.com', port: 4224}], // only contact these nodes
holepunching: true // set to false to disable hole punching
// was this a query/update response
type: dht.QUERY,
// who sent this response
node: { peer, host, id },
// the response payload decoded using the outputEncoding
value
}
```
The stream will emit query results as they arrive. If you backpressure the query it will backpressure the query as well.
Call `.destroy()` on the stream to cancel the query. If you pass the callback the streams payload will be buffered and passed to that.
If you pass a callback the stream will be error handled and buffered
and the content passed as an array.
#### `const stream = node.update(name, target, [value], [callback])`
Send a update command
#### `var stream = node.update(query, [options], [callback])`
Same options/results as above but the response data will have `type`
set to `dht.UPDATE`.
Same as a query but will trigger an update query on the 20 closest nodes (distance between node ids and target) after the query finishes.
Per default the stream will only contain results from the closest query. To include the query results also pass the `query: true` option.
#### `const stream = node.queryAndUpdate(name, target, [value], [callback])`
#### `node.on('query:{command}', data, callback)`
Send a combined query and update command.
Called when a specific query is invoked on a node. `data` contains the same values as in the query above and also a `.node` property with info about the node invoking the query.
Will keep querying until it finds the closest nodes to the target and then
issue an update. More efficient than doing a query/update yourself.
Call the callback with `(err, value)` to respond.
Same options/results as above but the response data will include both
query and update results.
#### `node.on('update:{command}', data, callback)`
#### `node.destroy(onclose)`
Called when an update query is invoked. The `data.node` is also guaranteed to have roundtripped to this dht before, meaning that you can trust that the host, port was not spoofed.
Fully destroys the dht node.
#### `node.ready(callback)`
#### `node.bootstrap(cb)`
Makes sure the initial bootstrap table has been built. You do not need to wait for this before querying.
Re-bootstrap the DHT node. Normally you shouldn't have to call this.
#### `node.holepunch(peer, cb)`
UDP holepunch to another peer. The DHT does this automatically
when it cannot reach another peer but you can use this yourself also.
Peer should look like this:
```js
{
port,
host,
// referrer should be the node/peer that
// told you about this node.
referrer: { port, host }
}
```
#### `node.bootstrap([callback])`
#### `node.listen([port], [address], [onlistening])`
Rebootstrap your node. Call this at regular intervals if you aren't doing any other queries.
Explicitly bind the dht node to a certain port/address.
#### `node.holepunch(peer, referrer, callback)`
#### `node.on('listening')`
UDP hole punch to another peer using the `referrer` as a STUN server.
Emitted when the node starts listening on a udp port.
#### `node.destroy()`
#### `node.on('close')`
Destroy the dht node. Releases all resources.
Emitted when the node is fully closed.
## License
#### `node.on('holepunch', fromPeer, toPeer)`
MIT
Emitted when the node is helping `fromPeer` udp holepunch to `toPeer`.

94
example.js

@ -1,46 +1,50 @@
var dht = require('./')
var blake2b = require('./blake2b')
var node = dht({
bootstrap: 'localhost:49737',
ephemeral: !!process.argv[2]
})
var values = {}
node.on('update:store', function (query, cb) {
console.log('(onupdate)')
if (!query.value) return cb()
var key = blake2b(query.value).toString('hex')
values[key] = query.value
console.log('Storing', key, '-->', query.value.toString())
cb()
})
node.on('query:lookup', function (query, cb) {
console.log('(onquery)')
var value = values[query.target.toString('hex')]
cb(null, value)
})
if (process.argv.length > 3) {
var val = process.argv.slice(3).join(' ')
if (process.argv[2] === 'put') {
node.update({command: 'store', target: blake2b(Buffer.from(val)), value: val}, function (err) {
if (err) throw err
console.log('Inserted', blake2b(Buffer.from(val)).toString('hex'))
})
}
if (process.argv[2] === 'get') {
node.query({command: 'lookup', target: Buffer.from(val, 'hex')})
.on('data', function (data) {
if (data.value && blake2b(data.value).toString('hex') === val) {
console.log(val, '-->', data.value.toString())
this.destroy()
}
})
.on('end', function () {
console.log('(query finished)')
})
}
const dht = require('./')
const bootstrap = dht()
bootstrap.listen(10001)
const nodes = []
var swarm = 1000
loop(null)
function loop (err) {
if (err) throw err
if (swarm--) addNode(loop)
else done()
}
function done () {
console.log('executing hi update')
const i = Math.floor(Math.random() * nodes.length)
const rs = nodes[i].update('hi', Buffer.alloc(32))
rs.resume()
rs.on('end', function () {
setTimeout(done, 2000)
})
}
function addNode (cb) {
const node = dht({
bootstrap: [
10001
]
})
var hits = 0
node.command('hi', {
update (query, cb) {
console.log('hi', ++hits)
cb(null)
},
query (query, cb) {
cb(null)
}
})
node.once('ready', function () {
nodes.push(node)
cb()
})
}

626
index.js

@ -1,446 +1,384 @@
var udp = require('udp-request')
var KBucket = require('k-bucket')
var inherits = require('inherits')
var events = require('events')
var peers = require('ipv4-peers')
var collect = require('stream-collector')
var sodium = require('sodium-universal')
var tos = require('time-ordered-set')
var nodes = peers.idLength(32)
var messages = require('./messages')
var queryStream = require('./query-stream')
var blake2b = require('./blake2b')
module.exports = DHT
function DHT (opts) {
if (!(this instanceof DHT)) return new DHT(opts)
if (!opts) opts = {}
events.EventEmitter.call(this)
var self = this
this.concurrency = opts.concurrency || 16
this.id = opts.id || randomBytes(32)
this.ephemeral = !!opts.ephemeral
this.bucket = new KBucket({localNodeId: this.id, arbiter: arbiter})
this.bucket.on('ping', onnodeping)
this.inflightQueries = 0
this.socket = udp({
socket: opts.socket,
requestEncoding: messages.Request,
responseEncoding: messages.Response
})
this.socket.on('request', onrequest)
this.socket.on('response', onresponse)
this.socket.on('close', onclose)
this.nodes = tos()
this._bootstrap = [].concat(opts.bootstrap || []).map(parseAddr)
this._queryId = this.ephemeral ? null : this.id
this._bootstrapped = false
this._pendingRequests = []
this._tick = 0
this._secrets = [randomBytes(32), randomBytes(32)]
this._secretsInterval = setInterval(rotateSecrets, 5 * 60 * 1000)
this._tickInterval = setInterval(tick, 5 * 1000)
if (opts.nodes) {
for (var i = 0; i < opts.nodes.length; i++) {
this._addNode(opts.nodes[i].id, opts.nodes[i])
}
const { EventEmitter } = require('events')
const peers = require('ipv4-peers')
const dgram = require('dgram')
const sodium = require('sodium-universal')
const KBucket = require('k-bucket')
const tos = require('time-ordered-set')
const collect = require('stream-collector')
const codecs = require('codecs')
const { Message, Holepunch } = require('./lib/messages')
const IO = require('./lib/io')
const QueryStream = require('./lib/query-stream')
const blake2b = require('./lib/blake2b')
const UNSUPPORTED_COMMAND = new Error('Unsupported command')
const nodes = peers.idLength(32)
exports = module.exports = opts => new DHT(opts)
class DHT extends EventEmitter {
constructor (opts) {
if (!opts) opts = {}
super()
this.bootstrapped = false
this.destroyed = false
this.concurrency = 16
this.socket = dgram.createSocket('udp4')
this.id = randomBytes(32)
this.inflightQueries = 0
this.ephemeral = !!opts.ephemeral
this.nodes = tos()
this.bucket = new KBucket({ localNodeId: this.id })
this.bucket.on('ping', this._onnodeping.bind(this))
this.bootstrapNodes = [].concat(opts.bootstrap || []).map(parsePeer)
this.socket.on('listening', this.emit.bind(this, 'listening'))
this.socket.on('close', this.emit.bind(this, 'close'))
const queryId = this.ephemeral ? null : this.id
const io = new IO(this.socket, queryId, this)
this._io = io
this._commands = new Map()
this._tick = 0
this._tickInterval = setInterval(this._ontick.bind(this), 5000)
process.nextTick(this.bootstrap.bind(this))
}
process.nextTick(function () {
self.bootstrap()
})
function rotateSecrets () {
self._rotateSecrets()
_ontick () {
this._tick++
if ((this._tick & 7) === 0) this._pingSome()
}
function onrequest (request, peer) {
self._onrequest(request, peer)
address () {
return this.socket.address()
}
function onresponse (response, peer) {
self._onresponse(response, peer)
command (name, opts) {
this._commands.set(name, {
inputEncoding: codecs(opts.inputEncoding || opts.valueEncoding),
outputEncoding: codecs(opts.outputEncoding || opts.valueEncoding),
query: opts.query || queryNotSupported,
update: opts.update || updateNotSupported
})
}
function onnodeping (oldContacts, newContact) {
self._onnodeping(oldContacts, newContact)
ready (onready) {
if (!this.bootstrapped) this.once('ready', onready)
else onready()
}
function onclose () {
while (self._pendingRequests.length) {
self._pendingRequests.shift().callback(new Error('Request cancelled'))
onrequest (type, message, peer) {
if (validateId(message.id)) {
this._addNode(message.id, peer, null)
}
self.emit('close')
}
function tick () {
self._tick++
if ((self._tick & 7) === 0) self._pingSome()
}
}
switch (message.command) {
case '_ping':
return this._onping(message, peer)
inherits(DHT, events.EventEmitter)
case '_find_node':
return this._onfindnode(message, peer)
DHT.prototype.ready = function (cb) {
if (!this._bootstrapped) this.once('ready', cb)
else cb()
}
case '_holepunch':
return this._onholepunch(message, peer)
DHT.prototype.query = function (query, opts, cb) {
if (typeof opts === 'function') return this.query(query, null, opts)
return collect(queryStream(this, query, opts), cb)
}
DHT.prototype.update = function (query, opts, cb) {
if (typeof opts === 'function') return this.update(query, null, opts)
if (!opts) opts = {}
if (opts.query) opts.verbose = true
opts.token = true
return collect(queryStream(this, query, opts), cb)
}
DHT.prototype._pingSome = function () {
var cnt = this.inflightQueries > 2 ? 1 : 3
var oldest = this.nodes.oldest
while (cnt--) {
if (!oldest || this._tick - oldest.tick < 3) continue
this._check(oldest)
oldest = oldest.next
default:
return this._oncommand(type, message, peer)
}
}
}
DHT.prototype.holepunch = function (peer, referrer, cb) {
peer = parseAddr(peer)
referrer = parseAddr(referrer)
this._ping(peer, noop)
this._holepunch(peer, referrer, cb)
}
DHT.prototype.ping = function (peer, cb) {
this._ping(parseAddr(peer), function (err, res, peer) {
if (err) return cb(err)
var rinfo = decodePeer(res.value)
if (!rinfo) return cb(new Error('Invalid pong'))
cb(null, rinfo, {port: peer.port, host: peer.host, id: res.id})
})
}
_onping (message, peer) {
if (message.value && !this.id.equals(message.value)) return
this._io.response(message, peers.encode([ peer ]), null, peer)
}
DHT.prototype.toArray = function () {
return this.bucket.toArray()
}
_onholepunch (message, peer) {
const value = decodeHolepunch(message.value)
if (!value) return
if (value.to) {
const to = decodePeer(value.to)
if (!to || samePeer(to, peer)) return
message.id = this._io.id
message.value = Holepunch.encode({ from: peers.encode([ peer ]) })
this.emit('holepunch', peer, to)
this._io.send(Message.encode(message), to)
return
}
DHT.prototype.destroy = function () {
clearInterval(this._secretsInterval)
clearInterval(this._tickInterval)
this.socket.destroy()
}
if (value.from) {
const from = decodePeer(value.from)
if (from) peer = from
}
DHT.prototype.address = function () {
return this.socket.address()
}
this._io.response(message, null, null, peer)
}
DHT.prototype._rotateSecrets = function () {
var secret = randomBytes(32)
this._secrets[1] = this._secrets[0]
this._secrets[0] = secret
}
_onfindnode (message, peer) {
if (!validateId(message.target)) return
DHT.prototype.bootstrap = function (cb) {
var self = this
const closerNodes = nodes.encode(this.bucket.closest(message.target, 20))
this._io.response(message, null, closerNodes, peer)
}
if (!this._bootstrap.length) return process.nextTick(done)
_oncommand (type, message, peer) {
if (!message.target) return
var backgroundCon = Math.min(self.concurrency, Math.max(2, Math.floor(self.concurrency / 8)))
var qs = this.query({
command: '_find_node',
target: this.id
})
const self = this
const cmd = this._commands.get(message.command)
qs.on('data', update)
qs.on('error', onerror)
qs.on('end', done)
if (!cmd) return reply(UNSUPPORTED_COMMAND)
update()
const query = {
type,
command: message.command,
node: peer,
target: message.target,
value: cmd.inputEncoding.decode(message.value)
}
function onerror (err) {
if (cb) cb(err)
}
if (type === IO.UPDATE) cmd.update(query, reply)
else cmd.query(query, reply)
function done () {
if (!self._bootstrapped) {
self._bootstrapped = true
self.emit('ready')
function reply (err, value) {
const closerNodes = nodes.encode(self.bucket.closest(message.target, 20))
if (err) return self._io.error(message, err, closerNodes, peer)
self._io.response(message, value && cmd.outputEncoding.encode(value), closerNodes, peer)
}
if (cb) cb()
}
function update () {
qs._concurrency = self.inflightQueries === 1 ? self.concurrency : backgroundCon
onresponse (message, peer) {
if (validateId(message.id)) {
this._addNode(message.id, peer, message.roundtripToken)
}
}
}
DHT.prototype._ping = function (peer, cb) {
this._request({command: '_ping', id: this._queryId}, peer, false, cb)
}
DHT.prototype._holepunch = function (peer, referrer, cb) {
// Expects the caller to have already sent a message to peer to open the firewall session
this._request({command: '_ping', id: this._queryId, forwardRequest: encodePeer(peer)}, referrer, false, cb)
}
DHT.prototype._request = function (request, peer, important, cb) {
if (this.socket.inflight >= this.concurrency || this._pendingRequests.length) {
this._pendingRequests.push({request: request, peer: peer, callback: cb})
} else {
this.socket.request(request, peer, cb)
holepunch (peer, cb) {
if (!peer.referrer) throw new Error('peer.referrer is required')
this._io.query('_holepunch', null, null, peer, cb)
}
}
DHT.prototype._onrequest = function (request, peer) {
if (validateId(request.id)) this._addNode(request.id, peer, request.roundtripToken)
if (request.roundtripToken) {
if (!request.roundtripToken.equals(this._token(peer, 0))) {
if (!request.roundtripToken.equals(this._token(peer, 1))) {
request.roundtripToken = null
}
}
destroy () {
this.destroyed = true
this._io.destroy()
clearInterval(this._tickInterval)
}
if (request.forwardRequest) {
this._forwardRequest(request, peer)
return
ping (peer, cb) {
this._io.query('_ping', null, peer.id, peer, function (err, res) {
if (err) return cb(err)
if (res.error) return cb(new Error(res.error))
const pong = decodePeer(res.value)
if (!pong) return cb(new Error('Invalid pong'))
cb(null, pong)
})
}
if (request.forwardResponse) peer = this._forwardResponse(request, peer)
_addNode (id, peer, token) {
if (id.equals(this.id)) return
switch (request.command) {
case '_ping': return this._onping(request, peer)
case '_find_node': return this._onfindnode(request, peer)
}
var node = this.bucket.get(id)
const fresh = !node
this._onquery(request, peer)
}
if (!node) node = {}
DHT.prototype._forwardResponse = function (request, peer) {
if (request.command !== '_ping') return // only allow ping for now
node.id = id
node.port = peer.port
node.host = peer.host
if (token) node.roundtripToken = token
node.tick = this._tick
try {
var from = peers.decode(request.forwardResponse)[0]
if (!from) return
} catch (err) {
return
if (!fresh) this.nodes.remove(node)
this.nodes.add(node)
this.bucket.add(node)
if (fresh) this.emit('add-node', node)
}
from.request = true
from.tid = peer.tid
return from
}
DHT.prototype._forwardRequest = function (request, peer) {
if (request.command !== '_ping') return // only allow ping forwards right now
try {
var to = peers.decode(request.forwardRequest)[0]
if (!to) return
} catch (err) {
return
_removeNode (node) {
this.nodes.remove(node)
this.bucket.remove(node.id)
this.emit('remove-node')
}
this.emit('holepunch', peer, to)
request.forwardRequest = null
request.forwardResponse = encodePeer(peer)
this.socket.forwardRequest(request, peer, to)
}
DHT.prototype._onquery = function (request, peer) {
if (!validateId(request.target)) return
var self = this
var query = {
node: {
id: request.id,
port: peer.port,
host: peer.host
},
command: request.command,
target: request.target,
value: request.value,
roundtripToken: request.roundtripToken
_token (peer, i) {
return blake2b.batch([
this._secrets[i],
Buffer.from(peer.host)
])
}
var method = request.roundtripToken ? 'update' : 'query'
_onnodeping (oldContacts, newContact) {
// if bootstrapping, we've recently pinged all nodes
if (!this.bootstrapped) return
const reping = []
if (!this.emit(method + ':' + request.command, query, callback) && !this.emit(method, query, callback)) callback()
for (var i = 0; i < oldContacts.length; i++) {
const old = oldContacts[i]
function callback (err, value) {
if (err) return
// check if we recently talked to this peer ...
if (this._tick === old.tick) {
this.bucket.add(oldContacts[i])
continue
}
var res = {
id: self._queryId,
value: value || null,
nodes: nodes.encode(self.bucket.closest(request.target, 20)),
roundtripToken: self._token(peer, 0)
reping.push(old)
}
self.socket.response(res, peer)
if (reping.length) this._reping(reping, newContact)
}
}
DHT.prototype._onresponse = function (response, peer) {
if (validateId(response.id)) this._addNode(response.id, peer, response.roundtripToken)
while (this.socket.inflight < this.concurrency && this._pendingRequests.length) {
var next = this._pendingRequests.shift()
this.socket.request(next.request, next.peer, next.callback)
_check (node) {
const self = this
this.ping(node, function (err) {
if (err) self._removeNode(node)
})
}
}
DHT.prototype._onping = function (request, peer) {
var res = {
id: this._queryId,
value: encodePeer(peer),
roundtripToken: this._token(peer, 0)
}
_reping (oldContacts, newContact) {
const self = this
this.socket.response(res, peer)
}
ping()
DHT.prototype._onfindnode = function (request, peer) {
if (!validateId(request.target)) return
function ping () {
const next = oldContacts.shift()
if (!next) return
self._io.queryImmediately('_ping', null, next.id, next, afterPing)
}
var res = {
id: this._queryId,
nodes: nodes.encode(this.bucket.closest(request.target, 20)),
roundtripToken: this._token(peer, 0)
function afterPing (err, res, node) {
if (!err) return ping()
self._removeNode(node)
self.bucket.add(newContact)
}
}
this.socket.response(res, peer)
}
DHT.prototype._onnodeping = function (oldContacts, newContact) {
if (!this._bootstrapped) return // bootstrapping, we've recently pinged all nodes
_pingSome () {
var cnt = this.inflightQueries > 2 ? 1 : 3
var oldest = this.nodes.oldest
var reping = []
for (var i = 0; i < oldContacts.length; i++) {
var old = oldContacts[i]
if (this._tick - old.tick < 3) { // less than 10s since we talked to this peer ...
this.bucket.add(oldContacts[i])
continue
while (cnt--) {
if (!oldest || this._tick === oldest.tick) continue
this._check(oldest)
oldest = oldest.next
}
reping.push(old)
}
if (reping.length) this._reping(reping, newContact)
}
DHT.prototype._check = function (node) {
var self = this
this._request({command: '_ping', id: this._queryId}, node, false, function (err) {
if (err) self._removeNode(node)
})
}
DHT.prototype._reping = function (oldContacts, newContact) {
var self = this
var next = null
query (command, target, value, cb) {
if (typeof value === 'function') return this.query(command, target, null, value)
return collect(this.runCommand(command, target, value, { query: true, update: false }), cb)
}
ping()
update (command, target, value, cb) {
if (typeof value === 'function') return this.update(command, target, null, value)
return collect(this.runCommand(command, target, value, { query: false, update: true }), cb)
}
function ping () {
next = oldContacts.shift()
if (next) self._request({command: '_ping', id: self._queryId}, next, true, afterPing)
queryAndUpdate (command, target, value, cb) {
if (typeof value === 'function') return this.queryAndUpdate(command, target, null, value)
return collect(this.runCommand(command, target, value, { query: true, update: true }), cb)
}
function afterPing (err) {
if (!err) return ping()
runCommand (command, target, value, opts) {
return new QueryStream(this, command, target, value, opts)
}
self._removeNode(next)
self.bucket.add(newContact)
listen (port, addr, cb) {
if (typeof port === 'function') return this.listen(0, null, port)
if (typeof addr === 'function') return this.listen(port, null, addr)
if (cb) this.once('listening', cb)
this.socket.bind(port, addr)
}
}
DHT.prototype._token = function (peer, i) {
return blake2b.batch([this._secrets[i], Buffer.from(peer.host)])
}
bootstrap (cb) {
const self = this
const backgroundCon = Math.min(this.concurrency, Math.max(2, Math.floor(this.concurrency / 8)))
DHT.prototype._addNode = function (id, peer, token) {
if (id.equals(this.id)) return
if (!this.bootstrapNodes.length) return process.nextTick(done)
var node = this.bucket.get(id)
var fresh = !node
const qs = this.query('_find_node', this.id)
if (!node) node = {}
qs.on('data', update)
qs.on('error', onerror)
qs.on('end', done)
node.id = id
node.port = peer.port
node.host = peer.host
node.roundtripToken = token
node.tick = this._tick
update()
if (!fresh) this.nodes.remove(node)
this.nodes.add(node)
function onerror (err) {
if (cb) cb(err)
}
function done () {
if (!self.bootstrapped) {
self.bootstrapped = true
self.emit('ready')
}
if (cb) cb()
}
this.bucket.add(node)
if (fresh) this.emit('add-node', node)
function update () {
qs._concurrency = self.inflightQueries === 1 ? self.concurrency : backgroundCon
}
}
}
DHT.prototype._removeNode = function (node) {
this.nodes.remove(node)
this.bucket.remove(node.id)
this.emit('remove-node', node)
exports.QUERY = DHT.QUERY = IO.QUERY
exports.UPDATE = DHT.UPDATE = IO.UPDATE
exports.DHT = DHT
function validateId (id) {
return id && id.length === 32
}
DHT.prototype.listen = function (port, cb) {
this.socket.listen(port, cb)
function randomBytes (n) {
const buf = Buffer.allocUnsafe(n)
sodium.randombytes_buf(buf)
return buf
}
function encodePeer (peer) {
return peer && peers.encode([peer])
function decodeHolepunch (buf) {
try {
return Holepunch.decode(buf)
} catch (err) {
return null
}
}
function decodePeer (buf) {
try {
return buf && peers.decode(buf)[0]
const p = peers.decode(buf)[0]
if (!p) throw new Error('No peer in buffer')
return p
} catch (err) {
return null
}
}
function parseAddr (addr) {
if (typeof addr === 'object' && addr) return addr
if (typeof addr === 'number') return parseAddr(':' + addr)
if (addr[0] === ':') return parseAddr('127.0.0.1' + addr)
return {port: Number(addr.split(':')[1] || 3282), host: addr.split(':')[0]}
}
function parsePeer (peer) {
if (typeof peer === 'object' && peer) return peer
if (typeof peer === 'number') return parsePeer(':' + peer)
if (peer[0] === ':') return parsePeer('127.0.0.1' + peer)
function validateId (id) {
return id && id.length === 32
const parts = peer.split(':')
return {
host: parts[0],
port: parseInt(parts[1], 10)
}
}
function arbiter (incumbant, candidate) {
return candidate
function samePeer (a, b) {
return a.port === b.port && a.host === b.host
}
function randomBytes (n) {
var buf = Buffer.allocUnsafe(n)
sodium.randombytes_buf(buf)
return buf
function updateNotSupported (query, cb) {
cb(new Error('Update not supported'))
}
function noop () {}
function queryNotSupported (query, cb) {
cb(null, null)
}

0
blake2b.js → lib/blake2b.js

299
lib/io.js

@ -0,0 +1,299 @@
const { Message, Holepunch, TYPE } = require('./messages')
const blake2b = require('./blake2b')
const peers = require('ipv4-peers')
const sodium = require('sodium-universal')
const QUERY = Symbol('QUERY')
const UPDATE = Symbol('UPDATE')
const ECANCELLED = new Error('Request cancelled')
const ETIMEDOUT = new Error('Request timed out')
ETIMEDOUT.code = 'ETIMEDOUT'
ECANCELLED.code = 'ECANCELLED'
const TRIES = 3
class IO {
constructor (socket, id, ctx) {
this.id = id
this.socket = socket
this.inflight = []
this._ctx = ctx
this._rid = (Math.random() * 65536) | 0
this._requests = new Array(65536)
this._pending = []
this._secrets = [ randomBytes(32), randomBytes(32) ]
this._tickInterval = setInterval(this._ontick.bind(this), 250)
this._rotateInterval = setInterval(this._onrotate.bind(this), 300000)
socket.on('message', this._onmessage.bind(this))
}
_token (peer, i) {
return blake2b.batch([
this._secrets[i],
Buffer.from(peer.host)
])
}
_free () {
const rid = this._rid++
if (this._rid === 65536) this._rid = 0
return rid
}
/*
R
/ \
A B
A sent a message to B that failed
It could be that the message got dropped
or that it needs holepunching
To retry
resend(req, A -> B)
fire_and_forget({ _holepunch, to: B }, A -> R)
R.onholepunch { to: B } => fire_and_forget({ _holepunch, from: A }, R -> B)
B.onholepunch { from: A } => fire_and_forget({ _holepunch }, B -> A)
A and B is now holepunched and the session has been retried as well
*/
_holepunch (req) {
const rid = req.message.command === '_holepunch'
? req.rid
: this._free()
const punch = {
type: req.message.type,
rid,
command: '_holepunch',
value: Holepunch.encode({
to: peers.encode([ req.peer ])
})
}
this.send(Message.encode(punch), req.peer.referrer)
}
_retry (req) {
req.timeout = 4
this.send(req.buffer, req.peer)
// if referrer is avail, try holepunching automatically
if (req.peer.referrer) this._holepunch(req)
}
_onmessage (buf, rinfo) {
const message = decodeMessage(buf)
if (!message) return
const peer = { port: rinfo.port, host: rinfo.address }
switch (message.type) {
case TYPE.RESPONSE:
this._ctx.onresponse(message, peer)
this._finish(message.rid, null, message, peer)
break
case TYPE.QUERY:
this._ctx.onrequest(QUERY, message, peer)
break
case TYPE.UPDATE:
const rt = message.roundtripToken
if (!rt || (!rt.equals(this._token(peer, 0)) && !rt.equals(this._token(peer, 1)))) return
this._ctx.onrequest(UPDATE, message, peer)
break
}
}
_finish (rid, err, val, peer) {
const req = this._requests[rid]
if (!req) return
this._requests[rid] = undefined
const top = this.inflight[this.inflight.length - 1]
this.inflight[top.index = req.index] = top
this.inflight.pop()
const type = req.message.type === TYPE.QUERY
? QUERY
: UPDATE
req.callback(err, val, peer, req.message, req.peer, type)
while (this._pending.length && this.inflight.length < this._ctx.concurrency) {
const { message, peer, callback } = this._pending.shift()
this._requestImmediately(message, peer, callback)
}
}
_request (message, peer, callback) {
// Should we wait to send?
if (this._pending.length || (this.inflight.length >= this._ctx.concurrency)) {
this._pending.push({ message, peer, callback })
} else {
this._requestImmediately(message, peer, callback)
}
}
_requestImmediately (message, peer, callback) {
const rid = message.rid = this._free()
const buffer = Message.encode(message)
const req = {
rid,
index: this.inflight.length,
callback,
message,
buffer,
peer,
timeout: 4,
tries: 0
}
this._requests[rid] = req
this.inflight.push(req)
this.send(buffer, peer)
// if sending a holepunch cmd, forward it right away
if (message.command === '_holepunch') this._holepunch(req)
}
_cancel (rid, err) {
this._finish(rid, err || ECANCELLED, null, null)
}
_onrotate () {
this._secrets[1] = this._secrets[0]
this._secrets[0] = randomBytes(32)
}
_ontick () {
for (var i = this.inflight.length - 1; i >= 0; i--) {
const req = this.inflight[i]
if (req.timeout === 2 && ++req.tries < TRIES) {
this._retry(req)
continue
}
if (--req.timeout) {
continue
}
this._cancel(req.rid, ETIMEDOUT)
}
}
send (buffer, peer) {
this.socket.send(buffer, 0, buffer.length, peer.port, peer.host)
}
destroy () {
clearInterval(this._rotateInterval)
clearInterval(this._tickInterval)
this.socket.close()
const pending = this._pending
this._pending = []
for (const req of pending) req.callback(ECANCELLED)
for (const req of this.inflight) this._cancel(req.rid)
}
response (request, value, closerNodes, peer) {
const message = {
type: TYPE.RESPONSE,
rid: request.rid,
id: this.id,
closerNodes,
roundtripToken: this._token(peer, 0),
value
}
this.send(Message.encode(message), peer)
}
error (request, error, closerNodes, peer) {
const message = {
type: TYPE.RESPONSE,
rid: request.rid,
id: this.id,
closerNodes,
error: error.message
}
this.send(Message.encode(message), peer)
}
query (command, target, value, peer, callback) {
if (!callback) callback = noop
this._request({
type: TYPE.QUERY,
rid: 0,
id: this.id,
target,
command,
value
}, peer, callback)
}
queryImmediately (command, target, value, peer, callback) {
if (!callback) callback = noop
this._requestImmediately({
type: TYPE.QUERY,
rid: 0,
id: this.id,
target,
command,
value
}, peer, callback)
}
update (command, target, value, peer, callback) {
if (!callback) callback = noop
this._request({
type: TYPE.UPDATE,
rid: 0,
id: this.id,
roundtripToken: peer.roundtripToken,
target,
command,
value
}, peer, callback)
}
}
IO.QUERY = QUERY
IO.UPDATE = UPDATE
module.exports = IO
function noop () {}
function decodeMessage (buf) {
try {
return Message.decode(buf)
} catch (err) {
return null
}
}
function randomBytes (n) {
const buf = Buffer.allocUnsafe(32)
sodium.randombytes_buf(buf)
return buf
}

272
messages.js → lib/messages.js

@ -1,4 +1,4 @@
// This file is auto generated by the protocol-buffers cli tool
// This file is auto generated by the protocol-buffers compiler
/* eslint-disable quotes */
/* eslint-disable indent */
@ -10,60 +10,46 @@ var encodings = require('protocol-buffers-encodings')
var varint = encodings.varint
var skip = encodings.skip
var Request = exports.Request = {
exports.TYPE = {
"QUERY": 1,
"UPDATE": 2,
"RESPONSE": 3
}
var Holepunch = exports.Holepunch = {
buffer: true,
encodingLength: null,
encode: null,
decode: null
}
var Response = exports.Response = {
var Message = exports.Message = {
buffer: true,
encodingLength: null,
encode: null,
decode: null
}
defineRequest()
defineResponse()
defineHolepunch()
defineMessage()
function defineRequest () {
function defineHolepunch () {
var enc = [
encodings.string,
encodings.bytes
]
Request.encodingLength = encodingLength
Request.encode = encode
Request.decode = decode
Holepunch.encodingLength = encodingLength
Holepunch.encode = encode
Holepunch.decode = decode
function encodingLength (obj) {
var length = 0
if (!defined(obj.command)) throw new Error("command is required")
var len = enc[0].encodingLength(obj.command)
length += 1 + len
if (defined(obj.id)) {
var len = enc[1].encodingLength(obj.id)
length += 1 + len
}
if (defined(obj.target)) {
var len = enc[1].encodingLength(obj.target)
if (defined(obj.from)) {
var len = enc[0].encodingLength(obj.from)
length += 1 + len
}
if (defined(obj.forwardRequest)) {
var len = enc[1].encodingLength(obj.forwardRequest)
length += 1 + len
}
if (defined(obj.forwardResponse)) {
var len = enc[1].encodingLength(obj.forwardResponse)
length += 1 + len
}
if (defined(obj.roundtripToken)) {
var len = enc[1].encodingLength(obj.roundtripToken)
length += 1 + len
}
if (defined(obj.value)) {
var len = enc[1].encodingLength(obj.value)
if (defined(obj.to)) {
var len = enc[0].encodingLength(obj.to)
length += 1 + len
}
return length
@ -73,39 +59,15 @@ function defineRequest () {
if (!offset) offset = 0
if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj))
var oldOffset = offset
if (!defined(obj.command)) throw new Error("command is required")
buf[offset++] = 10
enc[0].encode(obj.command, buf, offset)
offset += enc[0].encode.bytes
if (defined(obj.id)) {
if (defined(obj.from)) {
buf[offset++] = 18
enc[1].encode(obj.id, buf, offset)
offset += enc[1].encode.bytes
enc[0].encode(obj.from, buf, offset)
offset += enc[0].encode.bytes
}
if (defined(obj.target)) {
if (defined(obj.to)) {
buf[offset++] = 26
enc[1].encode(obj.target, buf, offset)
offset += enc[1].encode.bytes
}
if (defined(obj.forwardRequest)) {
buf[offset++] = 34
enc[1].encode(obj.forwardRequest, buf, offset)
offset += enc[1].encode.bytes
}
if (defined(obj.forwardResponse)) {
buf[offset++] = 42
enc[1].encode(obj.forwardResponse, buf, offset)
offset += enc[1].encode.bytes
}
if (defined(obj.roundtripToken)) {
buf[offset++] = 50
enc[1].encode(obj.roundtripToken, buf, offset)
offset += enc[1].encode.bytes
}
if (defined(obj.value)) {
buf[offset++] = 58
enc[1].encode(obj.value, buf, offset)
offset += enc[1].encode.bytes
enc[0].encode(obj.to, buf, offset)
offset += enc[0].encode.bytes
}
encode.bytes = offset - oldOffset
return buf
@ -117,18 +79,11 @@ function defineRequest () {
if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid")
var oldOffset = offset
var obj = {
command: "",
id: null,
target: null,
forwardRequest: null,
forwardResponse: null,
roundtripToken: null,
value: null
from: null,
to: null
}
var found0 = false
while (true) {
if (end <= offset) {
if (!found0) throw new Error("Decoded message is not valid")
decode.bytes = offset - oldOffset
return obj
}
@ -136,34 +91,13 @@ function defineRequest () {
offset += varint.decode.bytes
var tag = prefix >> 3
switch (tag) {
case 1:
obj.command = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
found0 = true
break
case 2:
obj.id = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
obj.from = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
break
case 3:
obj.target = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
break
case 4:
obj.forwardRequest = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
break
case 5:
obj.forwardResponse = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
break
case 6:
obj.roundtripToken = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
break
case 7:
obj.value = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
obj.to = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
break
default:
offset = skip(prefix & 7, buf, offset)
@ -172,31 +106,52 @@ function defineRequest () {
}
}
function defineResponse () {
function defineMessage () {
var enc = [
encodings.bytes
encodings.enum,
encodings.varint,
encodings.bytes,
encodings.string
]
Response.encodingLength = encodingLength
Response.encode = encode
Response.decode = decode
Message.encodingLength = encodingLength
Message.encode = encode
Message.decode = decode
function encodingLength (obj) {
var length = 0
if (!defined(obj.type)) throw new Error("type is required")
var len = enc[0].encodingLength(obj.type)
length += 1 + len
if (!defined(obj.rid)) throw new Error("rid is required")
var len = enc[1].encodingLength(obj.rid)
length += 1 + len
if (defined(obj.id)) {
var len = enc[0].encodingLength(obj.id)
var len = enc[2].encodingLength(obj.id)
length += 1 + len
}
if (defined(obj.nodes)) {
var len = enc[0].encodingLength(obj.nodes)
if (defined(obj.target)) {
var len = enc[2].encodingLength(obj.target)
length += 1 + len
}
if (defined(obj.value)) {
var len = enc[0].encodingLength(obj.value)
if (defined(obj.closerNodes)) {
var len = enc[2].encodingLength(obj.closerNodes)
length += 1 + len
}
if (defined(obj.roundtripToken)) {
var len = enc[0].encodingLength(obj.roundtripToken)
var len = enc[2].encodingLength(obj.roundtripToken)
length += 1 + len
}
if (defined(obj.command)) {
var len = enc[3].encodingLength(obj.command)
length += 1 + len
}
if (defined(obj.error)) {
var len = enc[3].encodingLength(obj.error)
length += 1 + len
}
if (defined(obj.value)) {
var len = enc[2].encodingLength(obj.value)
length += 1 + len
}
return length
@ -206,25 +161,48 @@ function defineResponse () {
if (!offset) offset = 0
if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj))
var oldOffset = offset
if (!defined(obj.type)) throw new Error("type is required")
buf[offset++] = 8
enc[0].encode(obj.type, buf, offset)
offset += enc[0].encode.bytes
if (!defined(obj.rid)) throw new Error("rid is required")
buf[offset++] = 16
enc[1].encode(obj.rid, buf, offset)
offset += enc[1].encode.bytes
if (defined(obj.id)) {
buf[offset++] = 10
enc[0].encode(obj.id, buf, offset)
offset += enc[0].encode.bytes
buf[offset++] = 26
enc[2].encode(obj.id, buf, offset)
offset += enc[2].encode.bytes
}
if (defined(obj.nodes)) {
buf[offset++] = 18
enc[0].encode(obj.nodes, buf, offset)
offset += enc[0].encode.bytes
if (defined(obj.target)) {
buf[offset++] = 34
enc[2].encode(obj.target, buf, offset)
offset += enc[2].encode.bytes
}
if (defined(obj.value)) {
buf[offset++] = 26
enc[0].encode(obj.value, buf, offset)
offset += enc[0].encode.bytes
if (defined(obj.closerNodes)) {
buf[offset++] = 42
enc[2].encode(obj.closerNodes, buf, offset)
offset += enc[2].encode.bytes
}
if (defined(obj.roundtripToken)) {
buf[offset++] = 34
enc[0].encode(obj.roundtripToken, buf, offset)
offset += enc[0].encode.bytes
buf[offset++] = 50
enc[2].encode(obj.roundtripToken, buf, offset)
offset += enc[2].encode.bytes
}
if (defined(obj.command)) {
buf[offset++] = 58
enc[3].encode(obj.command, buf, offset)
offset += enc[3].encode.bytes
}
if (defined(obj.error)) {
buf[offset++] = 66
enc[3].encode(obj.error, buf, offset)
offset += enc[3].encode.bytes
}
if (defined(obj.value)) {
buf[offset++] = 74
enc[2].encode(obj.value, buf, offset)
offset += enc[2].encode.bytes
}
encode.bytes = offset - oldOffset
return buf
@ -236,13 +214,21 @@ function defineResponse () {
if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid")
var oldOffset = offset
var obj = {
type: 1,
rid: 0,
id: null,
nodes: null,
value: null,
roundtripToken: null
target: null,
closerNodes: null,
roundtripToken: null,
command: "",
error: "",
value: null
}
var found0 = false
var found1 = false
while (true) {
if (end <= offset) {
if (!found0 || !found1) throw new Error("Decoded message is not valid")
decode.bytes = offset - oldOffset
return obj
}
@ -251,20 +237,42 @@ function defineResponse () {
var tag = prefix >> 3
switch (tag) {
case 1:
obj.id = enc[0].decode(buf, offset)
obj.type = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
found0 = true
break
case 2:
obj.nodes = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
obj.rid = enc[1].decode(buf, offset)
offset += enc[1].decode.bytes
found1 = true
break
case 3:
obj.value = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
obj.id = enc[2].decode(buf, offset)
offset += enc[2].decode.bytes
break
case 4:
obj.roundtripToken = enc[0].decode(buf, offset)
offset += enc[0].decode.bytes
obj.target = enc[2].decode(buf, offset)
offset += enc[2].decode.bytes
break
case 5:
obj.closerNodes = enc[2].decode(buf, offset)
offset += enc[2].decode.bytes
break
case 6:
obj.roundtripToken = enc[2].decode(buf, offset)
offset += enc[2].decode.bytes
break
case 7:
obj.command = enc[3].decode(buf, offset)
offset += enc[3].decode.bytes
break
case 8:
obj.error = enc[3].decode(buf, offset)
offset += enc[3].decode.bytes
break
case 9:
obj.value = enc[2].decode(buf, offset)
offset += enc[2].decode.bytes
break
default:
offset = skip(prefix & 7, buf, offset)

264
lib/query-stream.js

@ -0,0 +1,264 @@
const { Readable } = require('stream')
const nodes = require('ipv4-peers').idLength(32)
const QueryTable = require('./query-table')
const BOOTSTRAPPING = Symbol('BOOTSTRAPPING')
const MOVING_CLOSER = Symbol('MOVING_CLOSER')
const UPDATING = Symbol('UPDATING')
const FINALIZED = Symbol('FINALIZED')
class QueryStream extends Readable {
constructor (node, command, target, value, opts) {
if (!opts) opts = {}
if (!opts.concurrency) opts.concurrency = opts.highWaterMark || node.concurrency
super({
objectMode: true,
highWaterMark: opts.concurrency
})
const cmd = node._commands.get(command)
this.command = command
this.target = target
this.value = cmd ? cmd.inputEncoding.encode(value) : (value || null)
this.update = !!opts.update
this.query = !!opts.query || !opts.update
this.destroyed = false
this.inflight = 0
this.responses = 0
this.updates = 0
this.table = opts.table || new QueryTable(node.id, target)
node.inflightQueries++
this._status = opts.table ? MOVING_CLOSER : BOOTSTRAPPING
this._node = node
this._concurrency = opts.concurrency
this._callback = this._onresponse.bind(this)
this._map = identity
this._outputEncoding = cmd ? cmd.outputEncoding : null
}
map (fn) {
this._map = fn
return this
}
_onresponse (err, message, peer, request, to, type) {
this.inflight--
if (err && to.id) {
// Request, including retries, failed completely
// Remove the "to" node.
const node = this._node.bucket.get(to.id)
if (node) this._node._removeNode(node)
}
if (this._status === FINALIZED) {
if (!this.inflight) {
if (this.destroyed) this.emit('close')
else this.destroy()
}
return
}
if (err) {
this.emit('warning', err)
this._readMaybe()
return
}
this.responses++
this.table.addVerified(message, peer)
if (this._status === MOVING_CLOSER) {
const candidates = decodeNodes(message.closerNodes)
for (var i = 0; i < candidates.length; i++) {
this.table.addUnverified(candidates[i], peer)
}
} else if (this._status === UPDATING) {
this.updates++
}
if (message.error) {
this.emit('warning', new Error(message.error))
this._readMaybe()
return
}
if (!this.query && this._status === MOVING_CLOSER) {
this._readMaybe()
return
}
const value = this._outputEncoding
? this._decodeOutput(message.value)
: message.value
const data = this._map({
type,
node: {
id: message.id,
port: peer.port,
host: peer.host
},
value
})
if (!data) {
this._readMaybe()
return
}
this.push(data)
}
_decodeOutput (val) {
try {
return val && this._outputEncoding.decode(val)
} catch (err) {
return null
}
}
_bootstrap () {
const table = this.table
const bootstrap = this._node.bucket.closest(table.target, table.k)
var i = 0
for (; i < bootstrap.length; i++) {
const b = bootstrap[i]
const node = { id: b.id, port: b.port, host: b.host }
table.addUnverified(node, null)
}
const bootstrapNodes = this._node.bootstrapNodes
if (bootstrap.length < bootstrapNodes.length) {
for (i = 0; i < bootstrapNodes.length; i++) {
this._send(bootstrapNodes[i], true, false)
}
}
this._status = MOVING_CLOSER
this._moveCloser()
}
_sendAll (nodes, force, sendToken) {
var free = Math.max(0, this._concurrency - this._node._io.inflight.length)
var sent = 0
if (!free && !this.inflight) free = 1
if (!free) return 0
for (var i = 0; i < nodes.length; i++) {
if (this._send(nodes[i], force, sendToken)) {
if (++sent === free) break
}
}
return sent
}
_send (node, force, isUpdate) {
if (!force) {
if (node.queried) return false
node.queried = true
}
this.inflight++
const io = this._node._io
if (isUpdate) {
if (!node.roundtripToken) return this._callback(new Error('Roundtrip token is required'))
io.update(this.command, this.target, this.value, node, this._callback)
} else if (this.query) {
io.query(this.command, this.target, this.value, node, this._callback)
} else {
io.query('_find_node', this.target, null, node, this._callback)
}
return true
}
_sendUpdate () {
const sent = this._sendAll(this.table.closest, false, true)
if (sent || this.inflight) return
this._finalize()
}
_moveCloser () {
const table = this.table
const sent = this._sendAll(table.unverified, false, false)
if (sent || this.inflight) return
if (this.update) {
for (var i = 0; i < table.closest.length; i++) {
table.closest[i].queried = false
}
this._status = UPDATING
this._sendUpdate()
} else {
this._finalize()
}
}
_finalize () {
const status = this._status
if (status === FINALIZED) return
this._status = FINALIZED
this._node.inflightQueries--
if (!this.responses && !this.destroyed) {
this.destroy(new Error('No nodes responded'))
}
if (status === UPDATING && !this.updates && !this.destroyed) {
this.destroy(new Error('No close nodes responded'))
}
this.push(null)
}
_readMaybe () {
if (!this.inflight || this._readableState.flowing === true) this._read()
}
_read () {
if (this._node.destroyed) return
switch (this._status) {
case BOOTSTRAPPING: return this._bootstrap()
case MOVING_CLOSER: return this._moveCloser()
case UPDATING: return this._sendUpdate()
case FINALIZED: return
}
throw new Error('Unknown status: ' + this._status)
}
destroy (err) {
if (this.destroyed) return
this.destroyed = true
if (err) this.emit('error', err)
this._finalize()
if (!this.inflight) this.emit('close')
}
}
module.exports = QueryStream
function decodeNodes (buf) {
if (!buf) return []
try {
return nodes.decode(buf)
} catch (err) {
return []
}
}
function identity (a) {
return a
}

69
lib/query-table.js

@ -0,0 +1,69 @@
const xor = require('xor-distance')
class QueryTable {
constructor (id, target) {
this.k = 20
this.id = id
this.target = target
this.closest = []
this.unverified = []
}
addUnverified (node, referrer) {
if (node.id.equals(this.id)) return
node.distance = xor(this.target, node.id)
node.referrer = referrer
insertSorted(node, this.k, this.unverified)
}
addVerified (message, peer) {
if (!message.id || !message.roundtripToken || message.id.equals(this.id)) {
return
}
var prev = getNode(message.id, this.unverified)
if (!prev) {
prev = {
id: message.id,
host: peer.host,
port: peer.port,
distance: xor(message.id, this.target)
}
}
prev.roundtripToken = message.roundtripToken
insertSorted(prev, this.k, this.closest)
}
}
module.exports = QueryTable
function getNode (id, list) {
// find id in the list.
// technically this would be faster with binary search (against distance)
// but this list is always small, so meh
for (var i = 0; i < list.length; i++) {
if (list[i].id.equals(id)) return list[i]
}
return null
}
function insertSorted (node, max, list) {
if (list.length === max && !xor.lt(node.distance, list[max - 1].distance)) return
if (getNode(node.id, list)) return
if (list.length < max) list.push(node)
else list[max - 1] = node
var pos = list.length - 1
while (pos && xor.gt(list[pos - 1].distance, node.distance)) {
list[pos] = list[pos - 1]
list[pos - 1] = node
pos--
}
}

45
package.json

@ -1,38 +1,35 @@
{
"name": "dht-rpc",
"version": "3.0.1",
"description": "Make RPC calls over a Kademlia based DHT.",
"description": "Make RPC calls over a Kademlia based DHT",
"main": "index.js",
"dependencies": {
"duplexify": "^3.5.0",
"inherits": "^2.0.3",
"ipv4-peers": "^1.1.1",
"k-bucket": "^5.0.0",
"protocol-buffers-encodings": "^1.1.0",
"readable-stream": "^2.1.5",
"sodium-universal": "^2.0.0",
"stream-collector": "^1.0.1",
"time-ordered-set": "^1.0.1",
"udp-request": "^1.3.0",
"xor-distance": "^1.0.0"
},
"devDependencies": {
"protocol-buffers": "^3.2.1",
"standard": "^11.0.1",
"tape": "^4.9.0"
"scripts": {
"test": "standard && tape test.js",
"protobuf": "protocol-buffers schema.proto -o lib/messages.js"
},
"repository": {
"type": "git",
"url": "https://github.com/mafintosh/dht-rpc.git"
},
"scripts": {
"test": "standard && tape test.js",
"protobuf": "protocol-buffers schema.proto -o messages.js"
"url": "git+https://github.com/mafintosh/dht-rpc.git"
},
"author": "Mathias Buus (@mafintosh)",
"license": "MIT",
"bugs": {
"url": "https://github.com/mafintosh/dht-rpc/issues"
},
"homepage": "https://github.com/mafintosh/dht-rpc"
"homepage": "https://github.com/mafintosh/dht-rpc#readme",
"devDependencies": {
"protocol-buffers": "^4.1.0",
"standard": "^12.0.1",
"tape": "^4.9.1"
},
"dependencies": {
"codecs": "^1.2.1",
"ipv4-peers": "^1.1.1",
"k-bucket": "^5.0.0",
"protocol-buffers-encodings": "^1.1.0",
"sodium-universal": "^2.0.0",
"stream-collector": "^1.0.1",
"time-ordered-set": "^1.0.1",
"xor-distance": "^1.0.0"
}
}

304
query-stream.js

@ -1,304 +0,0 @@
var stream = require('readable-stream')
var inherits = require('inherits')
var nodes = require('ipv4-peers').idLength(32)
var xor = require('xor-distance')
var BLANK = Buffer.from([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
])
module.exports = QueryStream
function QueryStream (dht, query, opts) {
if (!(this instanceof QueryStream)) return new QueryStream(dht, query, opts)
if (!opts) opts = {}
if (!opts.concurrency) opts.concurrency = opts.highWaterMark || dht.concurrency
if (!query.target) throw new Error('query.target is required')
stream.Readable.call(this, {objectMode: true, highWaterMark: opts.concurrency})
var self = this
var nodes = opts.node || opts.nodes
this.query = query
this.query.id = dht._queryId
this.target = query.target
this.token = !!opts.token
this.holepunching = opts.holepunching !== false
this.commits = 0
this.responses = 0
this.errors = 0
this.destroyed = false
this.verbose = !!opts.verbose
dht.inflightQueries++
this._dht = dht
this._committing = false
this._finalized = false
this._closest = opts.closest || []
this._concurrency = opts.concurrency
this._updating = false
this._pending = nodes ? [].concat(nodes).map(copyNode) : []
this._k = nodes ? Infinity : opts.k || 20
this._inflight = 0
this._moveCloser = !nodes
this._map = opts.map || echo
this._bootstrapped = !this._moveCloser
this._onresponse = onresponse
this._onresponseholepunch = onresponseholepunch
function onresponseholepunch (err, res, peer, query) {
if (!err || !peer || !peer.referrer) self._callback(err, res, peer)
else self._holepunch(peer, query)
}
function onresponse (err, res, peer) {
self._callback(err, res, peer)
}
}
inherits(QueryStream, stream.Readable)
QueryStream.prototype.destroy = function (err) {
if (this.destroyed) return
this.destroyed = true
this._finalize()
if (err) this.emit('error', err)
this.emit('close')
}
QueryStream.prototype._finalize = function () {
if (this._finalized) return
this._finalized = true
this._dht.inflightQueries--
if (!this.responses && !this.destroyed) this.destroy(new Error('No nodes responded'))
if (!this.commits && this._committing && !this.destroyed) this.destroy(new Error('No close nodes responded'))
this.push(null)
}
QueryStream.prototype._bootstrap = function () {
this._bootstrapped = true
var bootstrap = this._dht.bucket.closest(this.target, this._k)
var i = 0
for (i = 0; i < bootstrap.length; i++) {
var b = bootstrap[i]
this._addPending({id: b.id, port: b.port, host: b.host}, null)
}
if (bootstrap.length < this._dht._bootstrap.length) {
for (i = 0; i < this._dht._bootstrap.length; i++) {
this._send(this._dht._bootstrap[i], true, false)
}
}
}
QueryStream.prototype._readMaybe = function () {
if (this._readableState.flowing === true) this._read()
}
QueryStream.prototype._sendTokens = function () {
if (this.destroyed) return
var sent = this._sendAll(this._closest, false, true)
if (sent || this._inflight) return
this._finalize()
}
QueryStream.prototype._sendPending = function () {
if (this.destroyed) return
if (!this._bootstrapped) this._bootstrap()
var sent = this._sendAll(this._pending, false, false)
if (sent || this._inflight) return
if (this.token) {
for (var i = 0; i < this._closest.length; i++) this._closest[i].queried = false
this._committing = true
this._sendTokens()
} else {
this._finalize()
}
}
QueryStream.prototype._read = function () {
if (this._committing) this._sendTokens()
else this._sendPending()
}
QueryStream.prototype._holepunch = function (peer, query) {
var self = this
this._dht._holepunch(peer, peer.referrer, function (err) {
if (err) return self._callback(err, null, peer)
self._dht._request(query, peer, false, self._onresponse)
})
}
QueryStream.prototype._callback = function (err, res, peer) {
this._inflight--
if (this.destroyed) return
if (err) {
if (res && res.id) {
var node = this._dht.bucket.get(res.id)
if (node) this._dht._removeNode(node)
}
this.errors++
this.emit('warning', err)
this._readMaybe()
return
}
this.responses++
if (this._committing) this.commits++
this._addClosest(res, peer)
if (this._moveCloser) {
var candidates = decodeNodes(res.nodes)
for (var i = 0; i < candidates.length; i++) this._addPending(candidates[i], peer)
}
if (!validateId(res.id) || (this.token && !this.verbose && !this._committing)) {
this._readMaybe()
return
}
var data = this._map({
node: {
id: res.id,
port: peer.port,
host: peer.host
},
value: res.value
})
if (!data) {
this._readMaybe()
return
}
this.push(data)
}
QueryStream.prototype._sendAll = function (nodes, force, useToken) {
var sent = 0
var free = Math.max(0, this._concurrency - this._dht.socket.inflight)
if (!free && !this._inflight) free = 1
if (!free) return 0
for (var i = 0; i < nodes.length; i++) {
if (this._send(nodes[i], force, useToken)) {
if (++sent === free) break
}
}
return sent
}
QueryStream.prototype._send = function (node, force, useToken) {
if (!force) {
if (node.queried) return false
node.queried = true
}
this._inflight++
var query = this.query
if (useToken && node.roundtripToken) {
query = {
command: this.query.command,
id: this.query.id,
target: this.query.target,
value: this.query.value,
roundtripToken: node.roundtripToken
}
}
this._dht._request(query, node, false, this.holepunching ? this._onresponseholepunch : this._onresponse)
return true
}
QueryStream.prototype._addPending = function (node, ref) {
if (node.id.equals(this._dht.id)) return
node.distance = xor(this.target, node.id)
node.referrer = ref
insertSorted(node, this._k, this._pending)
}
QueryStream.prototype._addClosest = function (res, peer) {
if (!res.id || !res.roundtripToken || res.id.equals(this._dht.id)) return
var prev = getNode(res.id, this._pending)
if (!prev) {
prev = {
id: res.id,
port: peer.port,
host: peer.host,
distance: xor(res.id, this.target)
}
}
prev.roundtripToken = res.roundtripToken
insertSorted(prev, this._k, this._closest)
}
function decodeNodes (buf) {
if (!buf) return []
try {
return nodes.decode(buf)
} catch (err) {
return []
}
}
function getNode (id, list) {
// find id in the list.
// technically this would be faster with binary search (against distance)
// but this list is always small, so meh
for (var i = 0; i < list.length; i++) {
if (list[i].id.equals(id)) return list[i]
}
return null
}
function validateId (id) {
return id && id.length === 32
}
function insertSorted (node, max, list) {
if (list.length === max && !xor.lt(node.distance, list[max - 1].distance)) return
if (getNode(node.id, list)) return
if (list.length < max) list.push(node)
else list[max - 1] = node
var pos = list.length - 1
while (pos && xor.gt(list[pos - 1].distance, node.distance)) {
list[pos] = list[pos - 1]
list[pos - 1] = node
pos--
}
}
function copyNode (node) {
return {
id: node.id || BLANK,
port: node.port,
host: node.host,
roundtripToken: node.roundtripToken,
referrer: node.referrer || node.referer
}
}
function echo (a) {
return a
}

37
schema.proto

@ -1,16 +1,27 @@
message Request {
required string command = 1;
optional bytes id = 2;
optional bytes target = 3;
optional bytes forwardRequest = 4;
optional bytes forwardResponse = 5;
optional bytes roundtripToken = 6;
optional bytes value = 7;
message Holepunch {
optional bytes from = 2;
optional bytes to = 3;
}
enum TYPE {
QUERY = 1;
UPDATE = 2;
RESPONSE = 3;
}
message Response {
optional bytes id = 1;
optional bytes nodes = 2;
optional bytes value = 3;
optional bytes roundtripToken = 4;
message Message {
// request/response type + id
required TYPE type = 1;
required uint64 rid = 2;
// kademlia stuff
optional bytes id = 3;
optional bytes target = 4;
optional bytes closerNodes = 5;
optional bytes roundtripToken = 6;
// rpc stuff
optional string command = 7;
optional string error = 8;
optional bytes value = 9;
}

249
test.js

@ -1,26 +1,25 @@
var tape = require('tape')
var dht = require('./')
var blake2b = require('./blake2b')
const tape = require('tape')
const dht = require('./')
const blake2b = require('./lib/blake2b')
tape('simple update', function (t) {
bootstrap(function (port, node) {
var a = dht({bootstrap: port})
var b = dht({bootstrap: port})
a.on('update:echo', function (data, callback) {
t.ok(data.roundtripToken, 'has roundtrip token')
t.same(data.value, Buffer.from('Hello, World!'), 'expected data')
callback(null, data.value)
const a = dht({ bootstrap: port })
const b = dht({ bootstrap: port })
a.command('echo', {
query (data, callback) {
t.fail('should not query')
callback(new Error('nope'))
},
update (data, callback) {
t.same(data.value, Buffer.from('Hello, World!'), 'expected data')
callback(null, data.value)
}
})
a.ready(function () {
var data = {
command: 'echo',
target: a.id,
value: Buffer.from('Hello, World!')
}
b.update(data, function (err, responses) {
b.update('echo', a.id, Buffer.from('Hello, World!'), function (err, responses) {
a.destroy()
b.destroy()
node.destroy()
@ -36,21 +35,18 @@ tape('simple update', function (t) {
tape('simple query', function (t) {
bootstrap(function (port, node) {
var a = dht({bootstrap: port})
var b = dht({bootstrap: port})
const a = dht({ bootstrap: port })
const b = dht({ bootstrap: port })
a.on('query:hello', function (data, callback) {
t.same(data.value, null, 'expected data')
callback(null, Buffer.from('world'))
a.command('hello', {
query (data, callback) {
t.same(data.value, null, 'expected data')
callback(null, Buffer.from('world'))
}
})
a.ready(function () {
var data = {
command: 'hello',
target: a.id
}
b.query(data, function (err, responses) {
b.query('hello', a.id, function (err, responses) {
a.destroy()
b.destroy()
node.destroy()
@ -64,91 +60,34 @@ tape('simple query', function (t) {
})
})
tape('targeted query', function (t) {
bootstrap(function (port, node) {
var a = dht({bootstrap: port})
a.on('query:echo', function (data, cb) {
t.pass('in echo')
cb(null, data.value)
})
var b = dht({bootstrap: port})
b.on('query:echo', function (data, cb) {
t.fail('should not hit me')
cb()
})
a.ready(function () {
b.ready(function () {
var client = dht({bootstrap: port})
client.query({
command: 'echo',
value: Buffer.from('hi'),
target: client.id
}, {
node: {
port: a.address().port,
host: '127.0.0.1'
}
}, function (err, responses) {
client.destroy()
a.destroy()
b.destroy()
node.destroy()
t.error(err, 'no error')
t.same(responses.length, 1, 'one response')
t.same(responses[0].value, Buffer.from('hi'), 'echoed')
t.end()
})
})
})
})
})
tape('targeted update', function (t) {
tape('query and update', function (t) {
bootstrap(function (port, node) {
var a = dht({bootstrap: port})
a.on('update:echo', function (data, cb) {
t.pass('in echo')
cb(null, data.value)
})
var b = dht({bootstrap: port})
b.on('update:echo', function (data, cb) {
t.fail('should not hit me')
cb()
const a = dht({ bootstrap: port })
const b = dht({ bootstrap: port })
a.command('hello', {
query (data, callback) {
t.same(data.value, null, 'expected query data')
callback(null, Buffer.from('world'))
},
update (data, callback) {
t.same(data.value, null, 'expected update data')
callback(null, Buffer.from('world'))
}
})
a.ready(function () {
b.ready(function () {
var client = dht({bootstrap: port})
client.update({
command: 'echo',
value: Buffer.from('hi'),
target: client.id
}, {
node: {
port: a.address().port,
host: '127.0.0.1'
}
}, function (err, responses) {
client.destroy()
a.destroy()
b.destroy()
node.destroy()
b.queryAndUpdate('hello', a.id, function (err, responses) {
a.destroy()
b.destroy()
node.destroy()
t.error(err, 'no error')
t.same(responses.length, 1, 'one response')
t.same(responses[0].value, Buffer.from('hi'), 'echoed')
t.end()
})
t.error(err, 'no errors')
t.same(responses.length, 2, 'two responses')
t.same(responses[0].value, Buffer.from('world'), 'responded')
t.same(responses[1].value, Buffer.from('world'), 'responded')
t.ok(responses[0].type !== responses[1].type, 'not the same type')
t.end()
})
})
})
@ -156,22 +95,23 @@ tape('targeted update', function (t) {
tape('swarm query', function (t) {
bootstrap(function (port, node) {
var swarm = []
const swarm = []
var closest = 0
loop()
function done () {
t.pass('created swarm')
var key = blake2b(Buffer.from('hello'))
var me = dht({bootstrap: port})
me.update({command: 'kv', target: key, value: Buffer.from('hello')}, function (err, responses) {
const key = blake2b(Buffer.from('hello'))
const me = dht({ bootstrap: port })
me.update('kv', key, Buffer.from('hello'), function (err, responses) {
t.error(err, 'no error')
t.same(closest, 20, '20 closest nodes')
t.same(responses.length, 20, '20 responses')
var stream = me.query({command: 'kv', target: key})
const stream = me.query('kv', key)
stream.on('data', function (data) {
if (data.value) {
@ -190,18 +130,20 @@ tape('swarm query', function (t) {
function loop () {
if (swarm.length === 256) return done()
var node = dht({bootstrap: port})
const node = dht({ bootstrap: port })
swarm.push(node)
var value = null
node.on('update:kv', function (data, cb) {
closest++
value = data.value
cb()
})
node.on('query:kv', function (data, cb) {
cb(null, value)
node.command('kv', {
update (data, cb) {
closest++
value = data.value
cb()
},
query (data, cb) {
cb(null, value)
}
})
node.ready(loop)
@ -209,12 +151,73 @@ tape('swarm query', function (t) {
})
})
tape('holepunch api', function (t) {
bootstrap(function (port, node) {
const a = dht({ bootstrap: port })
const b = dht({ bootstrap: port })
var holepunched = false
a.ready(function () {
b.ready(function () {
node.on('holepunch', function (from, to) {
t.same(from.port, a.address().port)
t.same(to.port, b.address().port)
holepunched = true
})
a.holepunch({
host: '127.0.0.1',
port: b.address().port,
referrer: {
host: '127.0.0.1',
port: node.address().port
}
}, function (err) {
t.error(err, 'no error')
t.ok(holepunched)
t.end()
node.destroy()
a.destroy()
b.destroy()
})
})
})
})
})
tape('timeouts', function (t) {
bootstrap(function (port, node) {
const a = dht({ bootstrap: port, ephemeral: true })
const b = dht({ bootstrap: port })
var tries = 0
b.command('nope', {
update (query, cb) {
tries++
t.pass('ignoring update')
}
})
b.ready(function () {
a.update('nope', Buffer.alloc(32), function (err) {
t.ok(err, 'errored')
t.same(tries, 3)
t.end()
node.destroy()
a.destroy()
b.destroy()
})
})
})
})
function bootstrap (done) {
var node = dht({
const node = dht({
ephemeral: true
})
node.listen(function () {
node.listen(0, function () {
done(node.address().port, node)
})
}

Loading…
Cancel
Save