Browse Source

move commands to enums

session-estimator
Mathias Buus 3 years ago
parent
commit
96c8dc0d1a
  1. 22
      README.md
  2. 118
      index.js
  3. 4
      lib/commands.js
  4. 22
      lib/io.js
  5. 8
      lib/query.js
  6. 51
      test.js

22
README.md

@ -51,9 +51,10 @@ function createNode () {
})
const values = new Map()
const VALUES = 0 // define a command enum
node.on('request', function (req) {
if (req.command === 'values') {
if (req.command === VALUES) {
if (req.token) { // if we are the closest node store the value (ie the node sent a valid roundtrip token)
const key = hash(req.value).toString('hex')
values.set(key, req.value)
@ -79,7 +80,7 @@ const node = new DHT()
const q = node.query({
target: hash(val),
command: 'values',
command: VALUES,
value
}, {
// commit true will make the query re-reuqest the 20 closest
@ -94,7 +95,7 @@ Then after inserting run this script to query for a value
``` js
const target = Buffer.from(hexFromAbove, 'hex')
for await (const data of node.query({ target, command: 'values' })) {
for await (const data of node.query({ target, command: VALUES })) {
if (data.value && hash(data.value).toString('hex') === hexFromAbove) {
// We found the value! Destroy the query stream as there is no need to continue.
console.log(val, '-->', data.value.toString())
@ -218,7 +219,7 @@ mode always uses a random port.
Emitted when an incoming DHT request is received. This is where you can add your own RPC methods.
* `req.target` - the dht target the peer is looking (routing is handled behind the scene)
* `req.command` - the RPC command name
* `req.command` - the RPC command enum
* `req.value` - the RPC value buffer
* `req.token` - If the remote peer echoed back a valid roundtrip token, proving their "from address" this is set
* `req.from` - who sent this request (host, port)
@ -236,17 +237,10 @@ DHT.ERROR_UNKNOWN_COMMAND = 1 // the command requested does not exist
DHT.ERROR_INVALID_TOKEN = 2 // the round trip token sent is invalid
```
The DHT has a couple of built in commands for bootstrapping and general DHT health management.
Those are:
* `find_node` - Find the closest DHT nodes to a specific target with no side-effects.
* `ping` - Ping another node to see if it is alive.
* `ping_nat` - Ping another node, but have it reply on a different UDP session to see if you are firewalled.
* `down_hint` - Gossiped internally to hint that a specific node might be down.
#### `reply = await node.request({ token, target, command, value }, to, [options])`
Send a request to a specific node specified by the to address (`{ host, port }`).
See the query API for more info on the arguments.
Options include:
@ -267,8 +261,8 @@ Sugar for `dht.request({ command: 'ping' }, to)`
Query the DHT. Will move as close as possible to the `target` provided, which should be a 32-byte uniformly distributed buffer (ie a hash).
* `target` - find nodes close to this
* `command` - the method you want to invoke
* `target` - find nodes close to this (should be a 32 byte buffer like a hash)
* `command` - an enum (uint) indicating the method you want to invoke
* `value` - optional binary payload to send with it
If you want to modify state stored in the dht, you can use the commit flag to signal the closest

118
index.js

@ -9,6 +9,7 @@ const IO = require('./lib/io')
const Query = require('./lib/query')
const peer = require('./lib/peer')
const { UNKNOWN_COMMAND, INVALID_TOKEN } = require('./lib/errors')
const { PING, PING_NAT, FIND_NODE, DOWN_HINT } = require('./lib/commands')
const TMP = Buffer.allocUnsafe(32)
const TICK_INTERVAL = 5000
@ -121,19 +122,30 @@ class DHT extends EventEmitter {
return this._bootstrapping
}
findNode (target, opts) {
if (this.destroyed) throw new Error('Node destroyed')
this._refreshTicks = REFRESH_TICKS
return new Query(this, target, true, FIND_NODE, null, opts)
}
query ({ target, command, value }, opts) {
if (this.destroyed) throw new Error('Node destroyed')
this._refreshTicks = REFRESH_TICKS
return new Query(this, target, command, value || null, opts)
return new Query(this, target, false, command, value || null, opts)
}
ping (to) {
return this.request({ token: null, command: 'ping', target: null, value: null }, to)
ping ({ host, port }, opts) {
const req = this.io.createRequest({ id: null, host, port }, null, true, PING, null, null)
return this._requestToPromise(req, opts)
}
request ({ token = null, command, target = null, value = null }, { host, port }, opts) {
const req = this.io.createRequest({ id: null, host, port }, token, command, target, value)
if (req === null) throw new Error('Node destroyed')
const req = this.io.createRequest({ id: null, host, port }, token, false, command, target, value)
return this._requestToPromise(req, opts)
}
_requestToPromise (req, opts) {
if (req === null) return Promise.reject(new Error('Node destroyed'))
if (opts && opts.socket) req.socket = opts.socket
if (opts && opts.retry === false) req.retries = 0
@ -161,7 +173,7 @@ class DHT extends EventEmitter {
const onlyFirewall = !this._forcePersistent
for (let i = 0; i < 2; i++) {
await this._backgroundQuery(this.table.id, 'find_node', null).on('data', ondata).finished()
await this._backgroundQuery(this.table.id).on('data', ondata).finished()
if (this.bootstrapped || (!testNat && !this._forcePersistent)) break
if (!(await this._updateNetworkState(onlyFirewall))) break
@ -185,14 +197,13 @@ class DHT extends EventEmitter {
const value = Buffer.allocUnsafe(2)
c.uint16.encode({ start: 0, end: 2, buffer: value }, self.io.serverSocket.address().port)
self.request({ token: null, command: 'ping_nat', target: null, value }, data.from)
.then(() => { testNat = true }, noop)
self._request(data.from, true, PING_NAT, null, value, () => { testNat = true }, noop)
}
}
refresh () {
const node = this.table.random()
this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null)
this._backgroundQuery(node ? node.id : this.table.id)
}
destroy () {
@ -201,8 +212,8 @@ class DHT extends EventEmitter {
return this.io.destroy()
}
_request (to, command, target, value, onresponse, onerror) {
const req = this.io.createRequest(to, null, command, target, value)
_request (to, internal, command, target, value, onresponse, onerror) {
const req = this.io.createRequest(to, null, internal, command, target, value)
if (req === null) return null
req.onresponse = onresponse
@ -323,7 +334,7 @@ class DHT extends EventEmitter {
oldNode.pinged = this._tick
this._repinging++
this._request({ id: null, host: oldNode.host, port: oldNode.port }, 'ping', null, null, onsuccess, onswap)
this._request({ id: null, host: oldNode.host, port: oldNode.port }, true, PING, null, null, onsuccess, onswap)
function onsuccess (m) {
if (oldNode.seen <= lastSeen) return onswap()
@ -342,40 +353,45 @@ class DHT extends EventEmitter {
this._addNodeFromNetwork(!external, req.from, req.to)
}
// standard keep alive call
if (req.command === 'ping') {
req.sendReply(0, null, false, false)
return
}
// check if the other side can receive a message to their other socket
if (req.command === 'ping_nat') {
if (req.value === null || req.value.byteLength < 2) return
const port = c.uint16.decode({ start: 0, end: 2, buffer: req.value })
if (port === 0) return
req.from.port = port
req.sendReply(0, null, false, false)
return
}
// empty dht reply back
if (req.command === 'find_node') {
if (!req.target) return
req.sendReply(0, null, false, true)
return
}
if (req.command === 'down_hint') {
if (req.value === null || req.value.byteLength < 6) return
if (this._checks < 10) {
sodium.crypto_generichash(TMP, req.value.subarray(0, 6))
const node = this.table.get(TMP)
if (node && (node.pinged < this._tick || node.downHints === 0)) {
node.downHints++
this._check(node)
if (req.internal) {
switch (req.command) {
// standard keep alive call
case PING: {
req.sendReply(0, null, false, false)
return
}
// check if the other side can receive a message to their other socket
case PING_NAT: {
if (req.value === null || req.value.byteLength < 2) return
const port = c.uint16.decode({ start: 0, end: 2, buffer: req.value })
if (port === 0) return
req.from.port = port
req.sendReply(0, null, false, false)
return
}
// empty dht reply back
case FIND_NODE: {
if (!req.target) return
req.sendReply(0, null, false, true)
return
}
// "this is node you sent me is down" - let's try to ping it
case DOWN_HINT: {
if (req.value === null || req.value.byteLength < 6) return
if (this._checks < 10) {
sodium.crypto_generichash(TMP, req.value.subarray(0, 6))
const node = this.table.get(TMP)
if (node && (node.pinged < this._tick || node.downHints === 0)) {
node.downHints++
this._check(node)
}
}
req.sendReply(0, null, false, false)
return
}
}
req.sendReply(0, null, false, false)
req.sendReply(UNKNOWN_COMMAND, null, false, req.target !== null)
return
}
@ -435,7 +451,7 @@ class DHT extends EventEmitter {
}
this._checks++
this._request({ id: null, host: node.host, port: node.port }, 'ping', null, null, onresponse, onerror)
this._request({ id: null, host: node.host, port: node.port }, true, PING, null, null, onresponse, onerror)
}
_ontick () {
@ -576,7 +592,7 @@ class DHT extends EventEmitter {
// double check they actually came on the server socket...
this.io.serverSocket.on('message', onmessage)
const pongs = await requestAll(this, 'ping_nat', value, nodes)
const pongs = await requestAll(this, true, PING_NAT, value, nodes)
if (!pongs.length) return true
let count = 0
@ -606,11 +622,11 @@ class DHT extends EventEmitter {
}
}
_backgroundQuery (target, command, value) {
_backgroundQuery (target) {
this._refreshTicks = REFRESH_TICKS
const backgroundCon = Math.min(this.concurrency, Math.max(2, (this.concurrency / 8) | 0))
const q = new Query(this, target, command, value, { concurrency: backgroundCon, maxSlow: 0 })
const q = new Query(this, target, true, FIND_NODE, null, { concurrency: backgroundCon, maxSlow: 0 })
q.on('data', () => {
// yield to other traffic
@ -651,14 +667,14 @@ function randomOffset (n) {
return n - ((Math.random() * 0.5 * n) | 0)
}
function requestAll (dht, command, value, nodes, opts) {
function requestAll (dht, internal, command, value, nodes) {
let missing = nodes.length
const replies = []
return new Promise((resolve) => {
for (const node of nodes) {
dht.request({ token: null, command, target: null, value }, node, opts)
.then(onsuccess, onerror)
const req = dht._request(node, internal, command, null, value, onsuccess, onerror)
if (!req) return resolve(replies)
}
function onsuccess (res) {

4
lib/commands.js

@ -0,0 +1,4 @@
exports.PING = 0
exports.PING_NAT = 1
exports.FIND_NODE = 2
exports.DOWN_HINT = 3

22
lib/io.js

@ -179,7 +179,7 @@ module.exports = class IO {
}
}
createRequest (to, token, command, target, value) {
createRequest (to, token, internal, command, target, value) {
if (this._destroying !== null) return null
if (this._tid === 65536) this._tid = 0
@ -187,14 +187,14 @@ module.exports = class IO {
const tid = this._tid++
const socket = this.firewalled ? this.clientSocket : this.serverSocket
const req = new Request(this, socket, tid, null, to, token, command, target, value)
const req = new Request(this, socket, tid, null, to, token, internal, command, target, value)
this.inflight.push(req)
return req
}
}
class Request {
constructor (io, socket, tid, from, to, token, command, target, value) {
constructor (io, socket, tid, from, to, token, internal, command, target, value) {
this.socket = socket
this.tid = tid
this.from = from
@ -203,6 +203,7 @@ class Request {
this.command = command
this.target = target
this.value = value
this.internal = internal
this.sent = 0
this.retries = 3
this.destroyed = false
@ -223,13 +224,14 @@ class Request {
const to = peer.ipv4.decode(state)
const id = flags & 1 ? c.fixed32.decode(state) : null
const token = flags & 2 ? c.fixed32.decode(state) : null
const command = c.string.decode(state)
const target = flags & 4 ? c.fixed32.decode(state) : null
const value = flags & 8 ? c.buffer.decode(state) : null
const internal = (flags & 4) !== 0
const command = c.uint.decode(state)
const target = flags & 8 ? c.fixed32.decode(state) : null
const value = flags & 16 ? c.buffer.decode(state) : null
if (id !== null) from.id = validateId(id, from)
return new Request(io, socket, tid, from, to, token, command, target, value)
return new Request(io, socket, tid, from, to, token, internal, command, target, value)
} catch {
return null
}
@ -330,14 +332,14 @@ class Request {
if (id) state.end += 32
if (token) state.end += 32
c.string.preencode(state, this.command)
c.uint.preencode(state, this.command)
if (this.target) state.end += 32
if (value) c.buffer.preencode(state, value)
state.buffer = Buffer.allocUnsafe(state.end)
state.buffer[state.start++] = REQUEST_ID
state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (this.target ? 4 : 0) | (value ? 8 : 0)
state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (this.internal ? 4 : 0) | (this.target ? 8 : 0) | (value ? 16 : 0)
c.uint16.encode(state, this.tid)
peer.ipv4.encode(state, to)
@ -345,7 +347,7 @@ class Request {
if (id) c.fixed32.encode(state, this._io.table.id)
if (token) c.fixed32.encode(state, token)
c.string.encode(state, this.command)
c.uint.encode(state, this.command)
if (this.target) c.fixed32.encode(state, this.target)
if (value) c.buffer.encode(state, value)

8
lib/query.js

@ -1,16 +1,18 @@
const { Readable } = require('streamx')
const peer = require('./peer')
const { DOWN_HINT } = require('./commands')
const DONE = []
const DOWN = []
module.exports = class Query extends Readable {
constructor (dht, target, command, value, opts = {}) {
constructor (dht, target, internal, command, value, opts = {}) {
super()
this.dht = dht
this.k = this.dht.table.k
this.target = target
this.internal = internal
this.command = command
this.value = value
this.errors = 0
@ -281,7 +283,7 @@ module.exports = class Query extends Readable {
_downHint (node, down) {
const state = { start: 0, end: 6, buffer: Buffer.allocUnsafe(6) }
peer.ipv4.encode(state, down)
this.dht._request(node, 'down_hint', null, state.buffer, noop, noop)
this.dht._request(node, true, DOWN_HINT, null, state.buffer, noop, noop)
}
_pushClosest (m) {
@ -315,7 +317,7 @@ module.exports = class Query extends Readable {
_visit (to) {
this.inflight++
const req = this.dht._request(to, this.command, this.target, this.value, this._onvisitbound, this._onerrorbound)
const req = this.dht._request(to, this.internal, this.command, this.target, this.value, this._onvisitbound, this._onerrorbound)
if (req === null) {
this.destroy(new Error('Node was destroyed'))
return

51
test.js

@ -15,7 +15,7 @@ test('make bigger swarm', async function (t) {
const targetNode = swarm[25]
const target = targetNode.id
let q = swarm[499].query({ command: 'find_node', target })
let q = swarm[499].findNode(target)
let messages = 0
let found = false
@ -30,7 +30,7 @@ test('make bigger swarm', async function (t) {
const replies = q.closestReplies
t.ok(found, 'found target in ' + messages + ' message(s)')
q = swarm[490].query({ command: 'find_node', target }, { nodes: q.closestNodes })
q = swarm[490].findNode(target, { nodes: q.closestNodes })
messages = 0
found = false
@ -44,7 +44,7 @@ test('make bigger swarm', async function (t) {
t.ok(found, 'found target again in ' + messages + ' message(s)')
q = swarm[470].query({ command: 'find_node', target }, { replies })
q = swarm[470].findNode(target, { replies })
messages = 0
found = false
@ -67,24 +67,26 @@ test('make bigger swarm', async function (t) {
test('commit after query', async function (t) {
const swarm = await makeSwarm(100, t)
const BEFORE = 0
const AFTER = 1
let commits = 0
for (const node of swarm) {
node.on('request', function (req) {
if (req.command === 'before') {
if (req.command === BEFORE) {
return req.reply(null)
}
if (req.command === 'after' && req.token) {
if (req.command === AFTER && req.token) {
commits++
return req.reply(null)
}
})
}
const q = swarm[42].query({ command: 'before', target: swarm[0].table.id }, {
const q = swarm[42].query({ command: BEFORE, target: swarm[0].table.id }, {
commit (m, dht, query) {
return dht.request({ command: 'after', target: query.target, token: m.token }, m.from)
return dht.request({ command: AFTER, target: query.target, token: m.token }, m.from)
}
})
@ -97,7 +99,7 @@ test('map query stream', async function (t) {
const swarm = await makeSwarm(10, t)
const expected = []
const q = swarm[0].query({ command: 'find_node', target: swarm[0].table.id }, {
const q = swarm[0].findNode(swarm[0].table.id, {
map (data) {
if (expected.length > 3) return null
expected.push(data.from.id)
@ -117,15 +119,18 @@ test('map query stream', async function (t) {
test('timeouts', async function (t) {
const [, a, b] = await makeSwarm(3, t)
let tries = 0
const NOPE = 52
t.plan(4)
b.on('request', function (req) {
if (req.command === 'nope') {
if (req.command === NOPE) {
tries++
t.pass('ignoring request')
}
})
const q = a.query({ command: 'nope', target: Buffer.alloc(32) })
const q = a.query({ command: NOPE, target: Buffer.alloc(32) })
await q.finished()
t.is(tries, 3)
@ -134,16 +139,17 @@ test('timeouts', async function (t) {
test('request with/without retries', async function (t) {
const [, a, b] = await makeSwarm(3, t)
let tries = 0
const NOPE = 442
b.on('request', function (req) {
if (req.command === 'nope') {
if (req.command === NOPE) {
tries++
t.pass('ignoring request')
}
})
try {
await a.request({ command: 'nope' }, { host: '127.0.0.1', port: b.address().port })
await a.request({ command: NOPE }, { host: '127.0.0.1', port: b.address().port })
} catch {
// do nothing
}
@ -151,7 +157,7 @@ test('request with/without retries', async function (t) {
t.is(tries, 3)
try {
await a.request({ command: 'nope' }, { host: '127.0.0.1', port: b.address().port }, { retry: false })
await a.request({ command: NOPE }, { host: '127.0.0.1', port: b.address().port }, { retry: false })
} catch {
// do nothing
}
@ -172,7 +178,7 @@ test('reply onflush', async function (t) {
})
})
await a.request({ command: 'hello' }, { host: '127.0.0.1', port: b.address().port })
await a.request({ command: 42 }, { host: '127.0.0.1', port: b.address().port })
t.ok(flushed)
})
@ -190,7 +196,7 @@ test('shorthand commit', async function (t) {
})
}
const q = swarm[0].query({ command: 'hello', target: Buffer.alloc(32) }, { commit: true })
const q = swarm[0].query({ command: 42, target: Buffer.alloc(32) }, { commit: true })
await q.finished()
@ -218,15 +224,16 @@ test('after ready it is always bound', async function (t) {
test('timeouts when commiting', async function (t) {
const [, a, b] = await makeSwarm(3, t)
let tries = 0
const NOPE = 41
b.on('request', function (req) {
if (req.command === 'nope') {
if (req.command === NOPE) {
tries++
t.pass('ignoring request')
}
})
const q = a.query({ command: 'nope', target: Buffer.alloc(32) }, { commit: true })
const q = a.query({ command: NOPE, target: Buffer.alloc(32) }, { commit: true })
let error = null
try {
@ -266,7 +273,7 @@ test('addNode / nodes option', async function (t) {
t.alike(bNodes, [{ host: '127.0.0.1', port: a.address().port }])
const responses = []
for await (const data of b.query({ command: 'hello', target: a.id })) {
for await (const data of b.query({ command: 52, target: a.id })) {
responses.push(data)
}
@ -300,21 +307,23 @@ test('set bind', async function (t) {
test('relay', async function (t) {
const [, a, b, c] = await makeSwarm(4, t)
const ROUTE = 1
b.on('request', function (req) {
t.is(req.command, 'route', 'b got request')
t.is(req.command, ROUTE, 'b got request')
t.is(req.from.port, a.address().port, 'from a')
const value = Buffer.concat([req.value, Buffer.from('b')])
req.relay(value, { host: '127.0.0.1', port: c.address().port })
})
c.on('request', function (req) {
t.is(req.command, 'route', 'c got request')
t.is(req.command, ROUTE, 'c got request')
t.is(req.from.port, b.address().port, 'from b')
const value = Buffer.concat([req.value, Buffer.from('c')])
req.reply(value, { to: { host: '127.0.0.1', port: a.address().port } })
})
const res = await a.request({ command: 'route', value: Buffer.from('a') }, { host: '127.0.0.1', port: b.address().port })
const res = await a.request({ command: ROUTE, value: Buffer.from('a') }, { host: '127.0.0.1', port: b.address().port })
t.alike(res.value, Buffer.from('abc'))
t.is(res.from.port, c.address().port)

Loading…
Cancel
Save