Browse Source

move to secure ids

session-estimator
Mathias Buus 4 years ago
parent
commit
c8e300eaf5
  1. 235
      index.js
  2. 25
      lib/id.js
  3. 81
      lib/messages.js
  4. 4
      lib/nat-analyzer.js
  5. 71
      lib/query.js
  6. 64
      lib/rpc.js
  7. 18
      test.js

235
index.js

@ -2,6 +2,7 @@ const dns = require('dns')
const RPC = require('./lib/rpc')
const Query = require('./lib/query')
const race = require('./lib/race')
const nodeId = require('./lib/id')
const NatAnalyzer = require('./lib/nat-analyzer')
const Table = require('kademlia-routing-table')
const TOS = require('time-ordered-set')
@ -11,8 +12,8 @@ const { EventEmitter } = require('events')
const TICK_INTERVAL = 5000
const SLEEPING_INTERVAL = 3 * TICK_INTERVAL
const STABLE_TICKS = 240 // if nothing major bad happens in ~20mins we can consider this node stable (if nat is friendly)
const MORE_STABLE_TICKS = 3 * STABLE_TICKS
const PERSISTENT_TICKS = 240 // if nothing major bad happens in ~20mins we can consider this node stable (if nat is friendly)
const MORE_PERSISTENT_TICKS = 3 * PERSISTENT_TICKS
const REFRESH_TICKS = 60 // refresh every ~5min when idle
const RECENT_NODE = 20 // we've heard from a node less than 1min ago
const OLD_NODE = 360 // if an node has been around more than 30 min we consider it old
@ -23,16 +24,14 @@ sodium.crypto_generichash(PING_BOOTSTRAP, Buffer.from('ping bootstrap'))
class Request {
constructor (dht, m) {
this.rpc = dht.rpc
this.dht = dht
this.tid = m.tid
this.from = m.from
this.to = m.to
this.nodeId = m.nodeId
this.token = m.token
this.target = m.target
this.closerNodes = m.closerNodes
this.status = m.status
this.token = m.token
this.command = m.command
this.value = m.value
}
@ -41,12 +40,12 @@ class Request {
return this.token !== null
}
error (code) {
return this.dht._reply(this.rpc, this.tid, this.target, code, null, false, this.from)
error (code, token = false) {
return this.dht._reply(this.tid, this.target, code, null, this.from, token)
}
reply (value, token = true) {
return this.dht._reply(this.rpc, this.tid, this.target, 0, value, token, this.from)
return this.dht._reply(this.tid, this.target, 0, value, this.from, token)
}
}
@ -54,46 +53,52 @@ class DHT extends EventEmitter {
constructor (opts = {}) {
super()
const id = opts.id || randomBytes(32)
this.bootstrapNodes = opts.bootstrap === false ? [] : (opts.bootstrap || []).map(parseNode)
this.nodes = new TOS()
this.table = new Table(id)
// this is just the private id, see below in the persistence handler
this.table = new Table(opts.id || randomBytes(32))
this.rpc = new RPC({
maxWindow: opts.maxWindow,
socket: opts.socket,
onwarning: opts.onwarning,
onwarning: opts.onwarning || console.error,
onrequest: this._onrequest.bind(this),
onresponse: this._onresponse.bind(this)
})
this.bootstrapped = false
this.concurrency = opts.concurrency || 16
this.ephemeral = !!opts.ephemeral
this.adaptive = !!opts.adaptive
this.ephemeral = true
this.adaptive = opts.ephemeral === false || opts.adaptive !== false
this.clientOnly = !this.adaptive
this._forcePersistent = opts.ephemeral === false
this._repinging = 0
this._reping = new FIFO(128)
this._bootstrapping = this.bootstrap()
this._secrets = [randomBytes(32), randomBytes(32)]
// make sure to random offset all the network ticks
this._tick = randomOffset(100)
this._refreshTicks = randomOffset(REFRESH_TICKS)
this._pingBootstrapTicks = randomOffset(REFRESH_TICKS)
this._stableTicks = this.adaptive ? STABLE_TICKS : 0
this._persistentTicks = this.adaptive ? PERSISTENT_TICKS : 0
this._tickInterval = setInterval(this._ontick.bind(this), TICK_INTERVAL)
this._rotateSecrets = false
this._lastTick = Date.now()
this._nat = new NatAnalyzer(opts.natSampleSize || 16)
this._onrow = (row) => row.on('full', (node) => this._onfullrow(node, row))
this._rotateSecrets = false
this._secrets = [
Buffer.alloc(32),
Buffer.alloc(32)
]
this.table.on('row', (row) => row.on('full', (node) => this._onfullrow(node, row)))
}
sodium.randombytes_buf(this._secrets[0])
sodium.randombytes_buf(this._secrets[1])
get nodeId () {
return this.table.id
this.table.on('row', this._onrow)
}
static createRPCSocket (opts) {
return new RPC(opts)
get id () {
return this.ephemeral ? null : this.table.id
}
ready () {
@ -115,8 +120,8 @@ class DHT extends EventEmitter {
tid: 0,
from: null,
to,
token: to.token || null,
nodeId: this.ephemeral ? null : this.table.id,
id: this.ephemeral ? null : this.table.id,
token: to.token,
target,
closerNodes: null,
command,
@ -126,9 +131,6 @@ class DHT extends EventEmitter {
}
requestAll (target, command, value, nodes, opts = {}) {
if (nodes instanceof Table) nodes = nodes.closest(nodes.id)
if (nodes instanceof Query) nodes = nodes.table.closest(nodes.table.id)
const min = typeof opts.min === 'number' ? opts.min : 1
if (nodes.length < min) return Promise.reject(new Error('Too few nodes to request'))
@ -154,16 +156,14 @@ class DHT extends EventEmitter {
}
async bootstrap () {
return new Promise((resolve) => {
this._backgroundQuery(this.table.id, 'find_node', null)
.on('close', () => {
if (!this.bootstrapped) {
this.bootstrapped = true
this.emit('ready')
}
resolve()
})
})
for (let i = 0; i < 2; i++) {
await this._backgroundQuery(this.table.id, 'find_node', null).finished()
if (this.bootstrapped || !this._forcePersistent || !this._onpersistent()) break
}
if (this.bootstrapped) return
this.bootstrapped = true
this.emit('ready')
}
_backgroundQuery (target, command, value) {
@ -182,9 +182,17 @@ class DHT extends EventEmitter {
return q
}
_token (addr, i) {
const token = Buffer.allocUnsafe(32)
this._rotateSecrets = true
// TODO: also add .port?
sodium.crypto_generichash(token, Buffer.from(addr.host), this._secrets[i])
return token
}
refresh () {
const node = this.table.random()
this._backgroundQuery(node ? node.nodeId : this.table.id, 'find_node', null)
this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null)
}
_pingSomeBootstrapNodes () {
@ -196,7 +204,7 @@ class DHT extends EventEmitter {
this._pingBootstrapTicks = REFRESH_TICKS
const nodes = this.table.closest(PING_BOOTSTRAP, 1)
if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].nodeId) > 0) {
if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].id) > 0) {
return
}
@ -205,7 +213,7 @@ class DHT extends EventEmitter {
q.on('close', () => {
if (q.closest.length === 0) return
if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].nodeId) > 0) {
if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].id) > 0) {
return
}
@ -233,32 +241,25 @@ class DHT extends EventEmitter {
while (cnt--) {
if (!oldest || this._tick === oldest.seen) continue
this._check(oldest)
this._check(oldest, oldest.seen)
oldest = oldest.next
}
}
_check (node) {
_check (node, lastSeen) {
this.ping(node)
.then(
m => this._maybeRemoveNode(node, m.nodeId),
() => this._removeStaleNode(node, lastSeen),
() => this._removeNode(node)
)
}
_token (peer, i) {
this._rotateSecrets = true
const out = Buffer.allocUnsafe(32)
sodium.crypto_generichash(out, Buffer.from(peer.host), this._secrets[i])
return out
}
_ontick () {
if (this._rotateSecrets) {
const tmp = this._secrets[0]
this._secrets[0] = this._secrets[1]
this._secrets[1] = tmp
sodium.randombytes_buf(tmp)
sodium.crypto_generichash(tmp, tmp)
}
const time = Date.now()
@ -273,8 +274,8 @@ class DHT extends EventEmitter {
if (!this.bootstrapped) return
if (this.adaptive && this.ephemeral && --this._stableTicks <= 0) {
this._onstable()
if (this.adaptive && this.ephemeral && --this._persistentTicks <= 0) {
this._onpersistent()
}
if ((this._tick & 7) === 0) {
@ -290,24 +291,42 @@ class DHT extends EventEmitter {
}
}
_onstable () {
if (this.remoteAddress().type === NatAnalyzer.PORT_CONSISTENT) {
this.emit('stable')
} else {
this._stableTicks = MORE_STABLE_TICKS
_onpersistent () {
if (this.ephemeral === false) return false
// TODO: do nat check also
const addr = this.remoteAddress(this._forcePersistent ? 1 : 3)
if (addr.type !== NatAnalyzer.PORT_CONSISTENT) {
this._persistentTicks = MORE_PERSISTENT_TICKS
return false
}
const id = nodeId(addr.host, addr.port)
if (this.table.id.equals(id)) return false
const nodes = this.table.toArray()
this.table = new Table(id)
for (const node of nodes) this.table.add(node)
this.table.on('row', this._onrow)
this.ephemeral = false
this.emit('persistent')
return true
}
_onwakeup () {
this._tick += 2 * OLD_NODE // bump the tick enough that everything appears old.
this._tick += 8 - (this._tick & 7) - 2 // triggers a series of pings in two ticks
this._stableTicks = MORE_STABLE_TICKS
this._persistentTicks = MORE_PERSISTENT_TICKS
this._pingBootstrapTicks = REFRESH_TICKS // forced ephemeral, so no need for a bootstrap ping soon
this._refreshTicks = 1 // triggers a refresh next tick (allow network time to wake up also)
if (this.adaptive) {
this.ephemeral = true
this.emit('unstable')
this.emit('ephemeral')
}
this.emit('wakeup')
@ -320,7 +339,7 @@ class DHT extends EventEmitter {
_repingMaybe () {
while (this._repinging < 3 && this._reping.isEmpty() === false) {
const { newNode, row } = this._reping.shift()
if (this.table.get(newNode.nodeId)) continue
if (this.table.get(newNode.id)) continue
let oldest = null
for (const node of row.nodes) {
@ -337,12 +356,13 @@ class DHT extends EventEmitter {
_repingAndSwap (newNode, oldNode) {
const self = this
const lastSeen = oldNode.seen
this._repinging++
this.ping(oldNode).then(onsuccess, onswap)
function onsuccess (m) {
if (m.nodeId === null || !m.nodeId.equals(oldNode.nodeId)) return onswap()
if (oldNode.seen <= lastSeen) return onswap()
self._repinging--
self._repingMaybe()
}
@ -363,14 +383,14 @@ class DHT extends EventEmitter {
for (const node of this.bootstrapNodes) {
dns.lookup(node.host, (_, host) => {
if (host) nodes.push({ nodeId: node.nodeId || null, host, port: node.port })
if (host) nodes.push({ id: nodeId(host, node.port), host, port: node.port })
if (--missing === 0) done(nodes)
})
}
}
_addNode (node) {
if (this.nodes.has(node) || node.nodeId.equals(this.table.id)) return
if (this.nodes.has(node) || node.id.equals(this.table.id)) return
node.added = node.seen = this._tick
@ -379,29 +399,40 @@ class DHT extends EventEmitter {
this.emit('add-node', node)
}
_maybeRemoveNode (node, expectedId) {
if (expectedId !== null && expectedId.equals(node.nodeId)) return
this._removeNode(node)
_removeStaleNode (node, lastSeen) {
if (node.seen <= lastSeen) this._removeNode(node)
}
_removeNode (node) {
if (!this.nodes.has(node)) return
this.table.remove(node.nodeId)
this.table.remove(node.id)
this.nodes.remove(node)
this.emit('remove-node', node)
}
_addNodeFromMessage (m) {
const oldNode = this.table.get(m.nodeId)
const id = nodeId(m.from.host, m.from.port)
// verify id, if the id is mismatched it doesn't strictly mean the node is bad, could be
// a weird NAT thing, that the node is unaware of - in any case it def means we should not
// add this node to our routing table
if (!m.id.equals(id)) {
m.id = null
return
}
const oldNode = this.table.get(id)
// TODO: if the to.host is different than what the node has told us previously
// ALSO add it to the nat analyser as we have likely changed networks...
// If we DO change the ip, it should factor into our adaptive logic as well potentially
// refresh it, if we've seen this before
if (oldNode) {
if (oldNode.port === m.from.port && oldNode.host === m.from.host) {
// refresh it
oldNode.seen = this._tick
this.nodes.add(oldNode)
}
oldNode.seen = this._tick
this.nodes.add(oldNode)
return
}
@ -409,11 +440,10 @@ class DHT extends EventEmitter {
this._nat.add(m.to)
this._addNode({
nodeId: m.nodeId,
token: null,
id,
port: m.from.port,
host: m.from.host,
id: m.nodeId, // alias for nodeId as the routing table expects that
token: null, // adding this so it has the same "shape" as the query nodes for easier debugging
added: this._tick,
seen: this._tick,
prev: null,
@ -422,38 +452,35 @@ class DHT extends EventEmitter {
}
_onrequest (req) {
if (req.nodeId !== null) this._addNodeFromMessage(req)
if (req.token !== null) {
if (!req.token.equals(this._token(req.from, 1)) && !req.token.equals(this._token(req.from, 0))) {
req.token = null
}
// check if the roundtrip token is one we've generated within the last 10s for this peer
if (req.token !== null && !this._token(req.from, 1).equals(req.token) && !this._token(req.from, 0).equals(req.token)) {
req.token = null
}
// empty reply back
if (req.id !== null) this._addNodeFromMessage(req)
else this._pingBootstrapTicks = REFRESH_TICKS
// o/ if this node is ephemeral, it prob originated from a bootstrapper somehow so no need to ping them
// echo the value back
if (req.command === 'ping') {
this._reply(this.rpc, req.tid, null, 0, null, false, req.from)
this._reply(req.tid, null, 0, req.value, req.from, false)
return
}
// empty dht reply back
if (req.command === 'find_node') {
this._reply(this.rpc, req.tid, req.target, 0, null, false, req.from)
this._reply(req.tid, req.target, 0, null, req.from, false)
return
}
// if this node is ephemeral, it prob came from a bootstrapper somehow so no need to ping them
if (req.nodeId === null) {
this._pingBootstrapTicks = REFRESH_TICKS
}
if (this.emit('request', new Request(this, req)) === false) {
this._reply(this.rpc, req.tid, req.target, 1, null, false, req.from)
this._reply(req.tid, req.target, 1, null, req.from, false)
}
}
_onresponse (res) {
if (res.nodeId !== null) this._addNodeFromMessage(res)
if (res.id !== null) this._addNodeFromMessage(res)
else if (this._nat.length < 3) this._nat.add(res.to)
}
bind (...args) {
@ -464,20 +491,19 @@ class DHT extends EventEmitter {
return this.rpc.address()
}
remoteAddress () {
return this._nat.analyze()
remoteAddress (minSamples) {
return this._nat.analyze(minSamples)
}
_reply (rpc, tid, target, status, value, token, to) {
_reply (tid, target, status, value, to, addToken) {
const closerNodes = target ? this.table.closest(target) : null
const persistent = !this.ephemeral && rpc === this.rpc
const reply = {
version: 1,
tid,
from: null,
to,
token: token ? this._token(to, 1) : null,
nodeId: persistent ? this.table.id : null,
id: this.ephemeral ? null : this.table.id,
token: addToken ? this._token(to, 1) : null,
target: null,
closerNodes,
command: null,
@ -485,7 +511,7 @@ class DHT extends EventEmitter {
value
}
rpc.send(reply)
this.rpc.send(reply)
return reply
}
}
@ -502,13 +528,12 @@ module.exports = DHT
function parseNode (s) {
if (typeof s === 'object') return s
const [, id, host, port] = s.match(/([a-f0-9]{64}@)?([^:@]+)(:\d+)?$/i)
if (!port) throw new Error('Node format is id@?host:port')
const [host, port] = s.split(':')
if (!port) throw new Error('Bootstrap node format is host:port')
return {
nodeId: id ? Buffer.from(id.slice(0, -1), 'hex') : null,
host,
port: Number(port.slice(1))
port: Number(port)
}
}

25
lib/id.js

@ -0,0 +1,25 @@
const sodium = require('sodium-universal')
const addr = Buffer.alloc(6)
let i = 0
module.exports = hash
function num (ip) {
let n = 0
let c = 0
while (i < ip.length && (c = ip.charCodeAt(i++)) !== 46) n = n * 10 + (c - 48)
return n
}
function hash (ip, port, out = Buffer.allocUnsafe(32)) {
i = 0
addr[0] = num(ip)
addr[1] = num(ip)
addr[2] = num(ip)
addr[3] = num(ip)
addr[4] = port
addr[5] = port >>> 8
sodium.crypto_generichash(out, addr)
return out
}

81
lib/messages.js

@ -4,7 +4,7 @@ const IPv4 = exports.IPv4 = {
preencode (state, ip) {
state.end += 4
},
encode (state, ip) {
encode (state, ip) { // TODO: move over fast parser from ./id.js
const nums = ip.split('.')
state.buffer[state.start++] = Number(nums[0]) || 0
state.buffer[state.start++] = Number(nums[1]) || 0
@ -33,40 +33,14 @@ const peerIPv4 = {
}
}
const dhtPeerIPv4 = exports.dhtPeerIPv4 = {
preencode (state, peer) {
state.end += 6 + 32
},
encode (state, peer) {
cenc.fixed32.encode(state, peer.nodeId)
IPv4.encode(state, peer.host)
cenc.uint16.encode(state, peer.port)
},
decode (state) {
return {
nodeId: cenc.fixed32.decode(state),
host: IPv4.decode(state),
port: cenc.uint16.decode(state)
}
}
}
const peerIPv4Array = exports.peerIPv4Array = cenc.array(peerIPv4)
const dhtPeerIPv4Array = exports.dhtPeerIPv4Array = cenc.array(dhtPeerIPv4)
/* eslint-disable no-multi-spaces */
const TYPE = 0b0001
const HAS_TOKEN = 0b0010
const HAS_NODE_ID = 0b0100
const HAS_TARGET = 0b1001
const HAS_CLOSER_NODES = 0b1001
const RESPONSE = 0b0000
const REQUEST = 0b0001
const TOKEN = 0b0010
const NODE_ID = 0b0100
const TARGET = 0b1000 | REQUEST
const CLOSER_NODES = 0b1000 | RESPONSE
const IS_REQUEST = 0b0001
const HAS_ID = 0b0010
const HAS_TOKEN = 0b0100
const ROUTE_INFO = 0b1000 | IS_REQUEST
const HAS_TARGET = ROUTE_INFO | IS_REQUEST
const HAS_CLOSER_NODES = ROUTE_INFO ^ IS_REQUEST
exports.message = {
preencode (state, m) {
@ -75,10 +49,10 @@ exports.message = {
state.end += 2 // tid
state.end += 6 // to
if (m.id) state.end += 32
if (m.token) state.end += 32
if (m.nodeId) state.end += 32
if (m.target) state.end += 32
if (m.closerNodes && m.closerNodes.length) dhtPeerIPv4Array.preencode(state, m.closerNodes)
if (m.closerNodes && m.closerNodes.length) peerIPv4Array.preencode(state, m.closerNodes)
if (m.command) cenc.string.preencode(state, m.command)
else cenc.uint.preencode(state, m.status)
@ -86,23 +60,24 @@ exports.message = {
},
encode (state, m) {
const closerNodes = m.closerNodes || []
const flags = (m.token ? HAS_TOKEN : 0) |
(m.nodeId ? NODE_ID : 0) |
(m.target ? TARGET : 0) |
(closerNodes.length ? CLOSER_NODES : 0) |
(m.command ? REQUEST : 0)
const flags = (m.id ? HAS_ID : 0) |
(m.token ? HAS_TOKEN : 0) |
(closerNodes.length ? HAS_CLOSER_NODES : 0) |
(m.target ? HAS_TARGET : 0) |
(m.command ? IS_REQUEST : 0)
state.buffer[state.start++] = 1
state.buffer[state.start++] = flags
cenc.uint16.encode(state, m.tid)
peerIPv4.encode(state, m.to)
if ((flags & HAS_TOKEN) === TOKEN) cenc.fixed32.encode(state, m.token)
if ((flags & HAS_NODE_ID) === NODE_ID) cenc.fixed32.encode(state, m.nodeId)
if ((flags & HAS_TARGET) === TARGET) cenc.fixed32.encode(state, m.target)
if ((flags & HAS_CLOSER_NODES) === CLOSER_NODES) dhtPeerIPv4Array.encode(state, closerNodes)
if ((flags & TYPE) === REQUEST) cenc.string.encode(state, m.command)
if ((flags & TYPE) === RESPONSE) cenc.uint.encode(state, m.status)
if ((flags & HAS_ID) === HAS_ID) cenc.fixed32.encode(state, m.id)
if ((flags & HAS_TOKEN) === HAS_TOKEN) cenc.fixed32.encode(state, m.token)
if ((flags & ROUTE_INFO) === HAS_TARGET) cenc.fixed32.encode(state, m.target)
if ((flags & ROUTE_INFO) === HAS_CLOSER_NODES) peerIPv4Array.encode(state, closerNodes)
if ((flags & IS_REQUEST) === IS_REQUEST) cenc.string.encode(state, m.command)
if ((flags & IS_REQUEST) === 0) cenc.uint.encode(state, m.status)
cenc.buffer.encode(state, m.value)
},
@ -120,12 +95,12 @@ exports.message = {
tid: cenc.uint16.decode(state),
from: null, // populated in caller
to: peerIPv4.decode(state),
token: ((flags & HAS_TOKEN) === TOKEN) ? cenc.fixed32.decode(state) : null,
nodeId: ((flags & HAS_NODE_ID) === NODE_ID) ? cenc.fixed32.decode(state) : null,
target: ((flags & HAS_TARGET) === TARGET) ? cenc.fixed32.decode(state) : null,
closerNodes: ((flags & HAS_CLOSER_NODES) === CLOSER_NODES) ? dhtPeerIPv4Array.decode(state) : null,
command: ((flags & TYPE) === REQUEST) ? cenc.string.decode(state) : null,
status: ((flags & TYPE) === RESPONSE) ? cenc.uint.decode(state) : 0,
id: (flags & HAS_ID) === HAS_ID ? cenc.fixed32.decode(state) : null,
token: (flags & HAS_TOKEN) === HAS_TOKEN ? cenc.fixed32.decode(state) : null,
target: ((flags & ROUTE_INFO) === HAS_TARGET) ? cenc.fixed32.decode(state) : null,
closerNodes: ((flags & ROUTE_INFO) === HAS_CLOSER_NODES) ? peerIPv4Array.decode(state) : null,
command: ((flags & IS_REQUEST) === IS_REQUEST) ? cenc.string.decode(state) : null,
status: ((flags & IS_REQUEST) === 0) ? cenc.uint.decode(state) : 0,
value: cenc.buffer.decode(state)
}
}

4
lib/nat-analyzer.js

@ -15,8 +15,8 @@ class NatAnalyzer {
this.top = (this.top + 1) & (this.samples.length - 1)
}
analyze () {
if (this.length <= 2) return { type: NatAnalyzer.UNKNOWN, host: null, port: 0 }
analyze (minSamples = 3) {
if (this.length < minSamples) return { type: NatAnalyzer.UNKNOWN, host: null, port: 0 }
const samples = this.samples.slice(0, this.length)
const hosts = new Map()

71
lib/query.js

@ -1,5 +1,6 @@
const { Readable } = require('streamx')
const race = require('./race')
const nodeId = require('./id')
module.exports = class Query extends Readable {
constructor (dht, target, command, value, opts = {}) {
@ -15,15 +16,15 @@ module.exports = class Query extends Readable {
this.concurrency = opts.concurrency || this.k
this.inflight = 0
this.map = opts.map || defaultMap
this.closests = []
this._closestReplies = []
this._slowdown = false
this._seen = new Set()
this._pending = []
this._onresolve = this._onvisit.bind(this)
this._onreject = this._onerror.bind(this)
this._fromTable = false
this._commit = opts.commit || null
this._commit = opts.commit === true ? autoCommit : (opts.commit || null)
this._commiting = false
const nodes = opts.nodes || opts.closest
@ -32,15 +33,11 @@ module.exports = class Query extends Readable {
// add them reverse as we pop below
for (let i = nodes.length - 1; i >= 0; i--) {
const node = nodes[i]
this._addPending(node.nodeId, node.host, node.port)
this._addPending(node.id || null, node.host, node.port)
}
}
}
get closest () {
return this._closestReplies.map(r => ({ nodeId: r.nodeId, host: r.from.host, port: r.from.port }))
}
finished () {
return new Promise((resolve, reject) => {
const self = this
@ -65,7 +62,7 @@ module.exports = class Query extends Readable {
async commit (command = this.command, value = this.value, opts) {
if (typeof command === 'object' && command) return this.commit(undefined, undefined, command)
return this.dht.requestAll(this.target, command, value, this._closestReplies, opts)
return this.dht.requestAll(this.target, command, value, this.closests, opts)
}
async toArray () {
@ -82,7 +79,7 @@ module.exports = class Query extends Readable {
const closest = this.dht.table.closest(this.target, this.k - this._pending.length)
for (const node of closest) {
this._addPending(node.nodeId, node.host, node.port)
this._addPending(node.id, node.host, node.port)
}
}
@ -92,22 +89,23 @@ module.exports = class Query extends Readable {
this.dht._resolveBootstrapNodes((bootstrapNodes) => {
for (const node of bootstrapNodes) {
this._addPending(node.nodeId, node.host, node.port)
this._addPending(node.id, node.host, node.port)
}
cb(null)
})
}
_isCloser (id) {
return this._closestReplies.length < this.k || this._compare(id, this._closestReplies[this._closestReplies.length - 1].nodeId) < 0
return this.closests.length < this.k || this._compare(id, this.closests[this.closests.length - 1].id) < 0
}
_addPending (nodeId, host, port) {
if (nodeId && !this._isCloser(nodeId)) return
_addPending (id, host, port) {
if (id && !this._isCloser(id)) return false
const addr = host + ':' + port
if (this._seen.has(addr)) return
if (this._seen.has(addr)) return true
this._seen.add(addr)
this._pending.push({ nodeId, host, port })
this._pending.push({ id, host, port })
return true
}
_read (cb) {
@ -122,7 +120,7 @@ module.exports = class Query extends Readable {
while (this.inflight < concurrency && this._pending.length > 0) {
const next = this._pending.pop()
if (next && next.nodeId && !this._isCloser(next.nodeId)) continue
if (next && next.id && !this._isCloser(next.id)) continue
this._visit(next)
}
@ -149,7 +147,7 @@ module.exports = class Query extends Readable {
return
}
if (!this._closestReplies.length) {
if (!this.closests.length) {
this.destroy(new Error('Too few nodes responded'))
return
}
@ -158,7 +156,7 @@ module.exports = class Query extends Readable {
this._commiting = true
const p = []
for (const node of this._closestReplies) p.push(this._commit(this.dht, this.target, node))
for (const node of this.closests) p.push(this._commit(node, this.dht, this))
race(p, 1, p.length)
.then(() => this.push(null), (err) => this.destroy(err))
@ -170,14 +168,22 @@ module.exports = class Query extends Readable {
this.inflight--
if (m.nodeId !== null && this._isCloser(m.nodeId)) {
this._pushClosest(m)
if (m.id !== null && this._isCloser(m.id)) {
this._pushClosest({
id: m.id,
host: m.from.host,
port: m.from.port,
token: m.token,
status: m.status,
value: m.value
})
}
if (m.closerNodes !== null) {
for (const node of m.closerNodes) {
if (node.nodeId.equals(this.dht.table.id)) continue
this._addPending(node.nodeId, node.host, node.port)
const id = nodeId(node.host, node.port)
if (id.equals(this.dht.table.id)) continue
if (!this._addPending(id, node.host, node.port)) break
}
}
@ -203,22 +209,22 @@ module.exports = class Query extends Readable {
}
_pushClosest (node) {
this._closestReplies.push(node)
for (let i = this._closestReplies.length - 2; i >= 0; i--) {
const prev = this._closestReplies[i]
const cmp = this._compare(prev.nodeId, node.nodeId)
this.closests.push(node)
for (let i = this.closests.length - 2; i >= 0; i--) {
const prev = this.closests[i]
const cmp = this._compare(prev.id, node.id)
// if sorted, done!
if (cmp < 0) break
// if dup, splice it out (rare)
if (cmp === 0) {
this._closestReplies.splice(i + 1, 1)
this.closests.splice(i + 1, 1)
break
}
// swap and continue down
this._closestReplies[i + 1] = prev
this._closestReplies[i] = node
this.closests[i + 1] = prev
this.closests[i] = node
}
if (this._closestReplies.length > this.k) this._closestReplies.pop()
if (this.closests.length > this.k) this.closests.pop()
}
_compare (a, b) {
@ -237,6 +243,11 @@ module.exports = class Query extends Readable {
}
}
function autoCommit (node, dht, query) {
if (!node.token) return Promise.reject(new Error('No token received for closest node'))
return dht.request(query.target, query.command, query.value, node)
}
function defaultMap (m) {
return m
}

64
lib/rpc.js

@ -1,44 +1,30 @@
const dgram = require('dgram')
const { message } = require('./messages')
const DEFAULT_TTL = 64
const HOLEPUNCH = Buffer.from([0])
module.exports = class RPC {
constructor (opts = {}) {
this._ttl = DEFAULT_TTL
this._pendingSends = []
this._sending = 0
this._onflush = onflush.bind(this)
this._tid = (Math.random() * 65536) | 0
this._drainInterval = null
this._tick = 0
this._w = 0
this._win = [0, 0, 0, 0]
this.maxWindow = 80 // ~100 per second burst, ~80 per second avg
this.maxWindow = opts.maxWindow || 80 // ~100 per second burst, ~80 per second avg
this.maxRetries = 3
this.destroyed = false
this.inflight = []
this.onholepunch = opts.onholepunch || noop
this.onrequest = opts.onrequest || noop
this.onresponse = opts.onresponse || noop
this.onwarning = opts.onwarning || noop
this.onconnection = opts.onconnection || noop
this.socket = opts.socket || dgram.createSocket('udp4')
this.socket.on('message', this._onmessage.bind(this))
if (this.onconnection) this.socket.on('connection', this._onutpconnection.bind(this))
}
get inflightRequests () {
return this.inflight.length
}
connect (addr) {
if (!this.socket.connect) throw new Error('UTP needed for connections')
return this.socket.connect(addr.port, addr.host)
}
send (m) {
const state = { start: 0, end: 0, buffer: null }
@ -46,7 +32,7 @@ module.exports = class RPC {
state.buffer = Buffer.allocUnsafe(state.end)
message.encode(state, m)
this._send(state.buffer, DEFAULT_TTL, m.to)
this._send(state.buffer, m.to)
}
reply (req, reply) {
@ -55,15 +41,6 @@ module.exports = class RPC {
this.send(reply)
}
holepunch (addr, ttl = DEFAULT_TTL) {
return new Promise((resolve) => {
this._send(HOLEPUNCH, ttl, addr, (err) => {
this._onflush()
resolve(!err)
})
})
}
address () {
return this.socket.address()
}
@ -113,8 +90,6 @@ module.exports = class RPC {
}
this.inflight = []
if (!closing) this.socket.setTTL(DEFAULT_TTL)
return this.socket
}
@ -153,7 +128,7 @@ module.exports = class RPC {
if (total < 2 * this.maxWindow && this._win[this._w] < this.maxWindow) {
this._win[this._w]++
req.tries++
this._send(req.buffer, DEFAULT_TTL, req.to)
this._send(req.buffer, req.to)
}
})
}
@ -189,6 +164,9 @@ module.exports = class RPC {
const req = this._dequeue(m.tid)
if (req === null) return
// decrement the inflight window as this is an "ack"
if (this._win[this._w] > 0) this._win[this._w]--
this.onresponse(m, this)
if (m.status === 0 || req.expectOk === false) {
@ -198,24 +176,9 @@ module.exports = class RPC {
}
}
_send (buffer, ttl, addr, done) {
if ((this._ttl !== ttl && this._sending > 0) || this._pendingSends.length > 0) {
this._pendingSends.push({ buffer, ttl, addr, done })
} else {
this._sendNow(buffer, ttl, addr, done)
}
}
_sendNow (buf, ttl, addr, done) {
_send (buf, addr) {
if (this.destroyed) return
this._sending++
if (ttl !== this._ttl) {
this._ttl = ttl
this.socket.setTTL(ttl)
}
this.socket.send(buf, 0, buf.byteLength, addr.port, addr.host, done || this._onflush)
this.socket.send(buf, 0, buf.byteLength, addr.port, addr.host)
}
_dequeue (tid) {
@ -255,7 +218,7 @@ module.exports = class RPC {
total++
this._win[this._w]++
this._send(req.buffer, DEFAULT_TTL, req.to)
this._send(req.buffer, req.to)
}
this._w = (this._w + 1) & 3
@ -275,13 +238,4 @@ function createStatusError (status) {
return err
}
function onflush () {
if (--this._sending === 0) {
while (this._pendingSends.length > 0 && (this._sending === 0 || this._pendingSends[0].ttl === this._ttl)) {
const { buffer, ttl, addr, done } = this._pendingSends.shift()
this._sendNow(buffer, ttl, addr, done)
}
}
}
function noop () {}

18
test.js

@ -12,13 +12,13 @@ tape('make bigger swarm', async function (t) {
const targetNode = swarm[25]
let q = swarm[499].query(targetNode.nodeId, 'find_node', null)
let q = swarm[499].query(targetNode.id, 'find_node', null)
let messages = 0
let found = false
for await (const data of q) {
messages++
if (data.nodeId.equals(targetNode.nodeId)) {
if (data.id.equals(targetNode.id)) {
found = true
break
}
@ -26,13 +26,13 @@ tape('make bigger swarm', async function (t) {
t.ok(found, 'found target in ' + messages + ' message(s)')
q = swarm[490].query(targetNode.nodeId, 'find_node', null, { closest: q.closest })
q = swarm[490].query(targetNode.id, 'find_node', null, { closest: q.closest })
messages = 0
found = false
for await (const data of q) {
messages++
if (data.nodeId.equals(targetNode.nodeId)) {
if (data.id.equals(targetNode.id)) {
found = true
break
}
@ -67,8 +67,8 @@ tape('commit after query', async function (t) {
}
const q = swarm[42].query(swarm[0].table.id, 'before', null, {
commit (node, target, m) {
return node.request(target, 'after', null, { token: m.token, ...m.from })
commit (node, dht, query) {
return dht.request(query.target, 'after', null, node)
}
})
@ -86,8 +86,8 @@ tape('map query stream', async function (t) {
const q = swarm[0].query(swarm[0].table.id, 'find_node', null, {
map (data) {
if (expected.length > 3) return null
expected.push(data.nodeId)
return data.nodeId
expected.push(data.id)
return data.id
}
})
@ -110,7 +110,7 @@ async function makeSwarm (n) {
const all = [node]
const bootstrap = ['localhost:' + node.address().port]
while (all.length < n) {
const node = new DHT({ bootstrap })
const node = new DHT({ ephemeral: false, bootstrap })
await node.ready()
all.push(node)
}

Loading…
Cancel
Save