Browse Source

adaptive mode and add bootstrap isolation recovery

session-estimator
Mathias Buus 4 years ago
parent
commit
a011be7093
  1. 126
      index.js
  2. 20
      lib/query.js
  3. 3
      lib/rpc.js

126
index.js

@ -11,9 +11,14 @@ const { EventEmitter } = require('events')
const TICK_INTERVAL = 5000
const SLEEPING_INTERVAL = 3 * TICK_INTERVAL
const STABLE_TICKS = 240 // if nothing major bad happens in ~20mins we can consider this node stable (if nat is friendly)
const MORE_STABLE_TICKS = 3 * STABLE_TICKS
const REFRESH_TICKS = 60 // refresh every ~5min when idle
const RECENT_NODE = 20 // we've heard from a node less than 1min ago
const OLD_NODE = 360 // if an node has been around more than 30 min we consider it old...
const OLD_NODE = 360 // if an node has been around more than 30 min we consider it old
// this is the known id for figuring out if we should ping bootstrap nodes
const PING_BOOTSTRAP = Buffer.allocUnsafe(32)
sodium.crypto_generichash(PING_BOOTSTRAP, Buffer.from('ping bootstrap'))
class Request {
constructor (dht, m) {
@ -39,7 +44,7 @@ class Request {
this.dht._reply(this.rpc, this.tid, this.target, code, null, false, this.from)
}
reply (value, token = false) {
reply (value, token = true) {
this.dht._reply(this.rpc, this.tid, this.target, 0, value, token, this.from)
}
}
@ -63,17 +68,20 @@ class DHT extends EventEmitter {
this.bootstrapped = false
this.concurrency = opts.concurrency || 16
this.ephemeral = !!opts.ephemeral
this.adaptive = !!opts.adaptive
this._repinging = 0
this._reping = new FIFO(128)
this._bootstrapping = this.bootstrap()
this._secrets = [randomBytes(32), randomBytes(32)]
this._tick = (Math.random() * 1024) | 0 // random offset it
// make sure to random offset all the network ticks
this._tick = randomOffset(100)
this._refreshTicks = randomOffset(REFRESH_TICKS)
this._pingBootstrapTicks = randomOffset(REFRESH_TICKS)
this._stableTicks = this.adaptive ? STABLE_TICKS : 0
this._tickInterval = setInterval(this._ontick.bind(this), TICK_INTERVAL)
this._rotateSecrets = false
this._lastTick = Date.now()
this._refreshTick = this._tick + REFRESH_TICKS
this._stableTick = this._tick + STABLE_TICKS
this._tickInterval = setInterval(this._ontick.bind(this), TICK_INTERVAL)
this._nat = new NatAnalyzer(opts.natSampleSize || 16)
this.table.on('row', (row) => row.on('full', (node) => this._onfullrow(node, row)))
@ -92,15 +100,15 @@ class DHT extends EventEmitter {
}
query (target, command, value, opts) {
this._refreshTick = this._tick + REFRESH_TICKS
return new Query(this, target, command, value, opts)
this._refreshTicks = REFRESH_TICKS
return new Query(this, target, command, value || null, opts)
}
ping (node) {
return this.request(null, 'ping', null, node)
}
request (target, command, value, to) {
request (target, command, value, to, opts) {
return this.rpc.request({
version: 1,
tid: 0,
@ -113,21 +121,23 @@ class DHT extends EventEmitter {
command,
status: 0,
value
})
}, opts)
}
requestAll (target, command, value, nodes, opts = {}) {
if (nodes instanceof Table) nodes = nodes.closest(nodes.id)
if (nodes instanceof Query) nodes = nodes.table.closest(nodes.table.id)
if (nodes.length === 0) return Promise.resolve([])
const min = typeof opts.min === 'number' ? opts.min : 1
const max = typeof opts.max === 'number' ? opts.max : nodes.length
if (nodes.length < min) return Promise.reject(new Error('Too few nodes to request'))
const p = []
for (const node of nodes) p.push(this.request(target, command, value, node))
let errors = 0
const results = []
const min = typeof opts.min === 'number' ? opts.min : 1
const max = typeof opts.max === 'number' ? opts.max : p.length
return new Promise((resolve, reject) => {
for (let i = 0; i < p.length; i++) p[i].then(ondone, onerror)
@ -183,6 +193,35 @@ class DHT extends EventEmitter {
this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null)
}
_pingSomeBootstrapNodes () {
// once in a while it can be good to ping the bootstrap nodes, since we force them to be ephemeral to lower their load
// to make this a lightweight as possible we first check if we are the closest node we know to a known id (hash(ping bootstrap))
// and if so we issue a background query against that. if after doing this query we are still one of the closests nodes
// we ping the bootstrapper - in practice this results to very little bootstrap ping traffic.
this._pingBootstrapTicks = REFRESH_TICKS
const nodes = this.table.closest(PING_BOOTSTRAP, 1)
if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].id) > 0) {
return
}
const q = this._backgroundQuery(PING_BOOTSTRAP, 'find_node', null)
q.on('close', () => {
if (q.closest.length === 0) return
if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].id) > 0) {
return
}
this._resolveBootstrapNodes((nodes) => {
const node = nodes[(Math.random() * nodes.length) | 0]
this.ping(node).then(noop, noop)
})
})
}
_pingSome () {
let cnt = this.rpc.inflightRequests > 2 ? 3 : 5
let oldest = this.nodes.oldest
@ -231,11 +270,7 @@ class DHT extends EventEmitter {
const time = Date.now()
if (time - this._lastTick > SLEEPING_INTERVAL) {
this._stableTick = 0 // never stable
this._tick += 2 * OLD_NODE // bump the tick enough that everything appears old.
this._tick += 8 - (this._tick & 7) - 2 // triggers a series of pings in two ticks
this._refreshTick = this._tick + 1 // triggers a refresh next tick (allow network time to wake up also)
this.emit('wakeup')
this._onwakeup()
} else {
this._tick++
}
@ -244,21 +279,46 @@ class DHT extends EventEmitter {
if (!this.bootstrapped) return
if (this._tick === this._stableTick) {
if (this.remoteAddress().type === NatAnalyzer.PORT_CONSISTENT) {
this.emit('stable')
}
if (this.adaptive && this.ephemeral && --this._stableTicks <= 0) {
this._onstable()
}
if ((this._tick & 7) === 0) {
this._pingSome()
}
if (((this._tick & 63) === 0 && this.nodes.length < this.table.k) || this._tick >= this._refreshTick) {
if (!this.ephemeral && --this._pingBootstrapTicks <= 0) {
this._pingSomeBootstrapNodes()
}
if (((this._tick & 63) === 0 && this.nodes.length < this.table.k) || --this._refreshTicks <= 0) {
this.refresh()
}
}
_onstable () {
if (this.remoteAddress().type === NatAnalyzer.PORT_CONSISTENT) {
this.emit('stable')
} else {
this._stableTicks = MORE_STABLE_TICKS
}
}
_onwakeup () {
this._tick += 2 * OLD_NODE // bump the tick enough that everything appears old.
this._tick += 8 - (this._tick & 7) - 2 // triggers a series of pings in two ticks
this._stableTicks = MORE_STABLE_TICKS
this._pingBootstrapTicks = REFRESH_TICKS // forced ephemeral, so no need for a bootstrap ping soon
this._refreshTicks = 1 // triggers a refresh next tick (allow network time to wake up also)
if (this.adaptive) {
this.ephemeral = true
this.emit('unstable')
}
this.emit('wakeup')
}
_onfullrow (newNode, row) {
if (this.bootstrapped && this._reping.push({ newNode, row })) this._repingMaybe()
}
@ -387,6 +447,11 @@ class DHT extends EventEmitter {
return
}
// if this node is ephemeral, it prob came from a bootstrapper somehow so no need to ping them
if (req.nodeId === null) {
this._pingBootstrapTicks = REFRESH_TICKS
}
if (this.emit('request', new Request(this, req)) === false) {
this._reply(this.rpc, req.tid, req.target, 1, null, false, req.from)
}
@ -455,3 +520,18 @@ function randomBytes (n) {
sodium.randombytes_buf(b)
return b
}
function noop () {}
function compare (id, a, b) {
for (let i = 0; i < id.length; i++) {
if (a[i] === b[i]) continue
const t = id[i]
return (t ^ a[i]) - (t ^ b[i])
}
return 0
}
function randomOffset (n) {
return n - ((Math.random() * 0.5 * n) | 0)
}

20
lib/query.js

@ -137,7 +137,9 @@ module.exports = class Query extends Readable {
}
_onvisit (m) {
this.successes++
if (m.status === 1) this.successes++
else this.errors++
this.inflight--
if (m.nodeId !== null && this._isCloser(m.nodeId)) {
@ -162,7 +164,13 @@ module.exports = class Query extends Readable {
this._slowdown = false
}
if (this.push(this.map(m)) !== false) this._readMore()
if (m.status !== 0 || this.push(this.map(m)) !== false) this._readMore()
}
_onerror () {
this.errors++
this.inflight--
this._readMore()
}
_pushClosest (node) {
@ -193,15 +201,9 @@ module.exports = class Query extends Readable {
return 0
}
_onerror () {
this.errors++
this.inflight--
this._readMore()
}
_visit (node) {
this.inflight++
this.dht.request(this.target, this.command, this.value, node)
this.dht.request(this.target, this.command, this.value, node, { expectOk: false })
.then(this._onresolve, this._onreject)
}
}

3
lib/rpc.js

@ -139,6 +139,7 @@ module.exports = class RPC {
const total = this._win[0] + this._win[1] + this._win[2] + this._win[3]
const req = {
timeout: 2,
expectOk: !!(opts && opts.expectOk !== false),
tries: (opts && opts.retry === false) ? this.maxRetries : 0,
tid: m.tid,
buffer: state.buffer,
@ -200,7 +201,7 @@ module.exports = class RPC {
if (req === null) return
this.onresponse(m, this)
if (m.status === 0) {
if (m.status === 0 || req.expectOk === false) {
req.resolve(m)
} else {
req.reject(createStatusError(m.status))

Loading…
Cancel
Save