Browse Source

add nat check, change defaults

session-estimator
Mathias Buus 4 years ago
parent
commit
1e27ca7dbb
  1. 72
      index.js
  2. 2
      lib/query.js
  3. 25
      lib/rpc.js

72
index.js

@ -8,6 +8,7 @@ const Table = require('kademlia-routing-table')
const TOS = require('time-ordered-set')
const FIFO = require('fast-fifo/fixed-size')
const sodium = require('sodium-universal')
const dgram = require('dgram')
const { EventEmitter } = require('events')
const TICK_INTERVAL = 5000
@ -40,12 +41,12 @@ class Request {
return this.token !== null
}
error (code, token = false) {
return this.dht._reply(this.tid, this.target, code, null, this.from, token)
error (code, token = false, socket) {
return this.dht._reply(this.tid, this.target, code, null, this.from, token, socket)
}
reply (value, token = true) {
return this.dht._reply(this.tid, this.target, 0, value, this.from, token)
reply (value, token = true, socket) {
return this.dht._reply(this.tid, this.target, 0, value, this.from, token, socket)
}
}
@ -67,17 +68,17 @@ class DHT extends EventEmitter {
})
this.bootstrapped = false
this.concurrency = opts.concurrency || 16
this.concurrency = opts.concurrency || this.table.k
this.ephemeral = true
this.adaptive = opts.ephemeral === false || opts.adaptive !== false
this.adaptive = opts.ephemeral !== false && opts.adaptive !== false
this.clientOnly = !this.adaptive
this._forcePersistent = opts.ephemeral === false
this._repinging = 0
this._reping = new FIFO(128)
this._bootstrapping = this.bootstrap()
// make sure to random offset all the network ticks
this._tick = randomOffset(100)
this._localSocket = opts.localSocket || null // used for auto configuring nat sessions
this._tick = randomOffset(100) // make sure to random offset all the network ticks
this._refreshTicks = randomOffset(REFRESH_TICKS)
this._pingBootstrapTicks = randomOffset(REFRESH_TICKS)
this._persistentTicks = this.adaptive ? PERSISTENT_TICKS : 0
@ -152,13 +153,17 @@ class DHT extends EventEmitter {
destroy () {
this.rpc.destroy()
if (this._localSocket) {
this._localSocket.close()
this._localSocket = null
}
clearInterval(this._tickInterval)
}
async bootstrap () {
for (let i = 0; i < 2; i++) {
await this._backgroundQuery(this.table.id, 'find_node', null).finished()
if (this.bootstrapped || !this._forcePersistent || !this._onpersistent()) break
if (this.bootstrapped || !this._forcePersistent || !(await this._onpersistent())) break
}
if (this.bootstrapped) return
@ -275,7 +280,7 @@ class DHT extends EventEmitter {
if (!this.bootstrapped) return
if (this.adaptive && this.ephemeral && --this._persistentTicks <= 0) {
this._onpersistent()
this._onpersistent() // the promise returned here never fails so just ignore it
}
if ((this._tick & 7) === 0) {
@ -291,7 +296,7 @@ class DHT extends EventEmitter {
}
}
_onpersistent () {
async _onpersistent () {
if (this.ephemeral === false) return false
// TODO: do nat check also
@ -303,6 +308,8 @@ class DHT extends EventEmitter {
return false
}
if (await this._checkIfFirewalled()) return false
const id = nodeId(addr.host, addr.port)
if (this.table.id.equals(id)) return false
@ -317,6 +324,34 @@ class DHT extends EventEmitter {
return true
}
async _addBootstrapNodes (nodes) {
return new Promise((resolve) => {
this._resolveBootstrapNodes(function (bootstrappers) {
nodes.push(...bootstrappers)
resolve()
})
})
}
async _checkIfFirewalled () {
const nodes = []
for (let node = this.nodes.latest; node && nodes.length < 5; node = node.prev) {
nodes.push(node)
}
if (nodes.length < 5) await this._addBootstrapNodes(nodes)
if (!nodes.length) return true // no nodes available, including bootstrappers - bail
try {
await this.requestAll(null, 'ping_nat', null, nodes, { min: 1, max: 3 })
} catch {
// not enough nat pings succeded - assume firewalled
return true
}
return false
}
_onwakeup () {
this._tick += 2 * OLD_NODE // bump the tick enough that everything appears old.
this._tick += 8 - (this._tick & 7) - 2 // triggers a series of pings in two ticks
@ -467,6 +502,12 @@ class DHT extends EventEmitter {
return
}
if (req.command === 'ping_nat') {
if (!this._localSocket) this._localSocket = dgram.createSocket('udp4')
this._reply(req.tid, null, 0, null, req.from, false, this._localSocket)
return
}
// empty dht reply back
if (req.command === 'find_node') {
this._reply(req.tid, req.target, 0, null, req.from, false)
@ -495,15 +536,16 @@ class DHT extends EventEmitter {
return this._nat.analyze(minSamples)
}
_reply (tid, target, status, value, to, addToken) {
_reply (tid, target, status, value, to, addToken, socket = this.rpc.socket) {
const closerNodes = target ? this.table.closest(target) : null
const ephemeral = socket !== this.rpc.socket || this.ephemeral
const reply = {
version: 1,
tid,
from: null,
to,
id: this.ephemeral ? null : this.table.id,
token: addToken ? this._token(to, 1) : null,
id: ephemeral ? null : this.table.id,
token: (!ephemeral && addToken) ? this._token(to, 1) : null,
target: null,
closerNodes,
command: null,
@ -511,7 +553,7 @@ class DHT extends EventEmitter {
value
}
this.rpc.send(reply)
this.rpc.send(reply, socket)
return reply
}
}

2
lib/query.js

@ -13,7 +13,7 @@ module.exports = class Query extends Readable {
this.value = value
this.errors = 0
this.successes = 0
this.concurrency = opts.concurrency || this.k
this.concurrency = opts.concurrency || this.dht.concurrency
this.inflight = 0
this.map = opts.map || defaultMap
this.closests = []

25
lib/rpc.js

@ -18,27 +18,27 @@ module.exports = class RPC {
this.onresponse = opts.onresponse || noop
this.onwarning = opts.onwarning || noop
this.socket = opts.socket || dgram.createSocket('udp4')
this.socket.on('message', this._onmessage.bind(this))
this.socket.on('message', this.onmessage.bind(this))
}
get inflightRequests () {
return this.inflight.length
}
send (m) {
send (m, socket = this.socket) {
const state = { start: 0, end: 0, buffer: null }
message.preencode(state, m)
state.buffer = Buffer.allocUnsafe(state.end)
message.encode(state, m)
this._send(state.buffer, m.to)
this._send(socket, state.buffer, m.to)
}
reply (req, reply) {
reply (req, reply, socket = this.socket) {
reply.tid = req.tid
reply.to = req.from
this.send(reply)
this.send(reply, socket)
}
address () {
@ -113,6 +113,7 @@ module.exports = class RPC {
return new Promise((resolve, reject) => {
const total = this._win[0] + this._win[1] + this._win[2] + this._win[3]
const req = {
socket: (opts && opts.socket) || this.socket,
timeout: 2,
expectOk: !!(opts && opts.expectOk !== false),
tries: (opts && opts.retry === false) ? this.maxRetries : 0,
@ -128,16 +129,12 @@ module.exports = class RPC {
if (total < 2 * this.maxWindow && this._win[this._w] < this.maxWindow) {
this._win[this._w]++
req.tries++
this._send(req.buffer, req.to)
this._send(req.socket, req.buffer, req.to)
}
})
}
_onutpconnection (socket) {
this.onconnection(socket, this)
}
_onmessage (buffer, rinfo) {
onmessage (buffer, rinfo) {
const from = { host: rinfo.address, port: rinfo.port }
if (!from.port) return
@ -176,9 +173,9 @@ module.exports = class RPC {
}
}
_send (buf, addr) {
_send (socket, buf, addr) {
if (this.destroyed) return
this.socket.send(buf, 0, buf.byteLength, addr.port, addr.host)
socket.send(buf, 0, buf.byteLength, addr.port, addr.host)
}
_dequeue (tid) {
@ -218,7 +215,7 @@ module.exports = class RPC {
total++
this._win[this._w]++
this._send(req.buffer, req.to)
this._send(req.socket, req.buffer, req.to)
}
this._w = (this._w + 1) & 3

Loading…
Cancel
Save