Browse Source

iterate based on feedback and testnet behaivor

session-estimator
Mathias Buus 3 years ago
parent
commit
eac1a5676d
  1. 853
      index.js
  2. 39
      lib/bind.js
  3. 8
      lib/errors.js
  4. 25
      lib/id.js
  5. 404
      lib/io.js
  6. 107
      lib/messages.js
  7. 117
      lib/nat-analyzer.js
  8. 49
      lib/peer.js
  9. 180
      lib/query.js
  10. 16
      lib/race.js
  11. 255
      lib/rpc.js
  12. 1
      package.json
  13. 77
      test.js

853
index.js

File diff suppressed because it is too large

39
lib/bind.js

@ -0,0 +1,39 @@
// TODO: move to module so we can have udp+tcp mode also on the same port etc etc
const dgram = require('dgram')
module.exports = async function bind (port) {
return new Promise((resolve, reject) => {
const socket = dgram.createSocket('udp4')
let tries = 1
socket.bind(port)
socket.on('listening', onlistening)
socket.on('error', onerror)
function onlistening () {
cleanup()
resolve(socket)
}
function onerror (err) {
if (port === 0 || tries >= 5) {
cleanup()
reject(err)
return
}
if (++tries < 5) {
socket.bind(++port)
} else {
port = 0
socket.bind(0)
}
}
function cleanup () {
socket.removeListener('error', onerror)
socket.removeListener('listening', onlistening)
}
})
}

8
lib/errors.js

@ -0,0 +1,8 @@
exports.BAD_COMMAND = 1
exports.BAD_TOKEN = 2
exports.TIMEOUT = new Error('Request timed out')
exports.TIMEOUT.code = 'ETIMEDOUT'
exports.DESTROY = new Error('Request destroyed')
exports.DESTROY.code = 'EDESTROYED'

25
lib/id.js

@ -1,25 +0,0 @@
const sodium = require('sodium-universal')
const addr = Buffer.alloc(6)
let i = 0
module.exports = hash
function num (ip) {
let n = 0
let c = 0
while (i < ip.length && (c = ip.charCodeAt(i++)) !== 46) n = n * 10 + (c - 48)
return n
}
function hash (ip, port, out = Buffer.allocUnsafe(32)) {
i = 0
addr[0] = num(ip)
addr[1] = num(ip)
addr[2] = num(ip)
addr[3] = num(ip)
addr[4] = port
addr[5] = port >>> 8
sodium.crypto_generichash(out, addr)
return out
}

404
lib/io.js

@ -0,0 +1,404 @@
const FIFO = require('fast-fifo')
const sodium = require('sodium-universal')
const c = require('compact-encoding')
const peer = require('./peer')
const bind = require('./bind')
const { BAD_TOKEN, TIMEOUT, DESTROY } = require('./errors')
const VERSION = 0b11
const RESPONSE_ID = (0b0001 << 4) | VERSION
const REQUEST_ID = (0b0000 << 4) | VERSION
const TMP = Buffer.alloc(32)
const EMPTY_ARRAY = []
module.exports = class IO {
constructor (table, { maxWindow = 80, bind = 0, firewalled = true, onrequest, onresponse = noop, ontimeout = noop } = {}) {
this.table = table
this.inflight = []
this.clientSocket = null
this.serverSocket = null
this.firewalled = firewalled !== false
this.ephemeral = true
this.congestion = new CongestionWindow(maxWindow)
this.onrequest = onrequest
this.onresponse = onresponse
this.ontimeout = ontimeout
this._pending = new FIFO()
this._rotateSecrets = 8
this._tid = (Math.random() * 65536) | 0
this._secrets = null
this._drainInterval = null
this._destroying = null
this._binding = null
this._bind = bind
}
onmessage (socket, buffer, rinfo) {
if (buffer.byteLength < 2) return
const from = { id: null, host: rinfo.address, port: rinfo.port }
const state = { start: 1, end: buffer.byteLength, buffer }
const expectedSocket = this.firewalled ? this.clientSocket : this.serverSocket
const external = socket !== expectedSocket
if (buffer[0] === REQUEST_ID) {
const req = Request.decode(this, socket, from, state)
if (req === null) return
if (req.token !== null && !req.token.equals(this.token(req.from, 1)) && !req.token.equals(this.token(req.from, 0))) {
req.error(BAD_TOKEN, { token: true })
return
}
this.onrequest(req, external)
return
}
if (buffer[0] === RESPONSE_ID) {
const res = decodeReply(from, state)
if (res === null) return
for (let i = 0; i < this.inflight.length; i++) {
const req = this.inflight[i]
if (req.tid !== res.tid) continue
if (i === this.inflight.length - 1) this.inflight.pop()
else this.inflight[i] = this.inflight.pop()
if (req._timeout) {
clearTimeout(req._timeout)
req._timeout = null
}
this.congestion.recv()
this.onresponse(res, external)
req.onresponse(res, req)
break
}
}
}
token (addr, i) {
if (this._secrets === null) {
const buf = Buffer.alloc(64)
this._secrets = [buf.subarray(0, 32), buf.subarray(32, 64)]
sodium.randombytes_buf(this._secrets[0])
sodium.randombytes_buf(this._secrets[1])
}
const token = Buffer.allocUnsafe(32)
sodium.crypto_generichash(token, Buffer.from(addr.host), this._secrets[i])
return token
}
async destroy () {
if (this._destroying) return this._destroying
// simplifies timing to await the bind here also, although it might be unneeded
await this.bind()
if (this._drainInterval) {
clearInterval(this._drainInterval)
this._drainInterval = null
}
while (this.inflight.length) {
const req = this.inflight.pop()
if (req._timeout) clearTimeout(req._timeout)
req._timeout = null
req.destroyed = true
req.onerror(DESTROY, req)
}
this._destroying = new Promise((resolve) => {
let missing = 2
this.serverSocket.close(done)
this.clientSocket.close(done)
function done () {
if (--missing === 0) resolve()
}
})
return this._destroying
}
bind () {
if (this._binding) return this._binding
this._binding = this._bindSockets()
return this._binding
}
async _bindSockets () {
this.serverSocket = typeof this._bind === 'function' ? this._bind() : await bind(this._bind)
try {
// TODO: we should reroll the socket is it's close to our preferred range of ports
// to avoid it being accidentally opened
// We'll prop need additional APIs for that
this.clientSocket = await bind(0)
} catch (err) {
await new Promise((resolve) => this.serverSocket.close(resolve))
this.serverSocket = null
throw err
}
this.serverSocket.on('message', this.onmessage.bind(this, this.serverSocket))
this.clientSocket.on('message', this.onmessage.bind(this, this.clientSocket))
if (this._drainInterval === null) {
this._drainInterval = setInterval(this._drain.bind(this), 750)
if (this._drainInterval.unref) this._drainInterval.unref()
}
for (const req of this.inflight) {
if (!req.socket) req.socket = this.firewalled ? this.clientSocket : this.serverSocket
req.sent = 0
req.send(false)
}
}
_drain () {
if (this._secrets !== null && --this._rotateSecrets === 0) {
this._rotateSecrets = 8
const tmp = this._secrets[0]
this._secrets[0] = this._secrets[1]
this._secrets[1] = tmp
sodium.crypto_generichash(tmp, tmp)
}
this.congestion.drain()
while (!this.congestion.isFull()) {
const p = this._pending.shift()
if (p === undefined) return
p._sendNow()
}
}
createRequest (to, token, command, target, value) {
if (this._destroying !== null) return null
if (this._tid === 65536) this._tid = 0
const tid = this._tid++
const socket = this.firewalled ? this.clientSocket : this.serverSocket
const req = new Request(this, socket, tid, null, to, token, command, target, value)
this.inflight.push(req)
return req
}
}
class Request {
constructor (io, socket, tid, from, to, token, command, target, value) {
this.socket = socket
this.tid = tid
this.from = from
this.to = to
this.token = token
this.command = command
this.target = target
this.value = value
this.sent = 0
this.destroyed = false
this.oncycle = noop
this.onerror = noop
this.onresponse = noop
this._buffer = null
this._io = io
this._timeout = null
}
static decode (io, socket, from, state) {
try {
const flags = c.uint.decode(state)
const tid = c.uint16.decode(state)
const to = peer.ipv4.decode(state)
const id = flags & 1 ? c.fixed32.decode(state) : null
const token = flags & 2 ? c.fixed32.decode(state) : null
const command = c.string.decode(state)
const target = flags & 4 ? c.fixed32.decode(state) : null
const value = flags & 8 ? c.buffer.decode(state) : null
if (id !== null) from.id = validateId(id, from)
return new Request(io, socket, tid, from, to, token, command, target, value)
} catch {
return null
}
}
reply (value, opts = {}) {
this.sendReply(0, value || null, opts.token !== false, this.target !== null && opts.closerNodes !== false)
}
error (code, opts = {}) {
this.sendReply(code, null, false, this.target !== null && opts.closerNodes !== false)
}
send (force = false) {
if (this.destroyed) return
if (this.socket === null) return
if (this._buffer === null) this._buffer = this._encodeRequest()
if (!force && this._io.congestion.isFull()) {
this._io._pending.push(this)
return
}
this._sendNow()
}
_sendNow () {
if (this.destroyed) return
this.sent++
this._io.congestion.send()
this.socket.send(this._buffer, 0, this._buffer.byteLength, this.to.port, this.to.host)
if (this._timeout) clearTimeout(this._timeout)
this._timeout = setTimeout(oncycle, 1000, this)
}
destroy (err) {
if (this.destroyed) return
this.destroyed = true
const i = this._io.inflight.indexOf(this)
if (i === -1) return
if (i === this._io.inflight.length - 1) this._io.inflight.pop()
else this._io.inflight[i] = this._io.inflight.pop()
this.onerror(err || DESTROY, this)
}
sendReply (error, value, token, hasCloserNodes) {
if (this.socket === null || this.destroyed) return
const id = this._io.ephemeral === false && this.socket === this._io.serverSocket
const closerNodes = hasCloserNodes ? this._io.table.closest(this.target) : EMPTY_ARRAY
const state = { start: 0, end: 1 + 1 + 6 + 2, buffer: null } // (type | version) + flags + to + tid
if (id) state.end += 32
if (token) state.end += 32
if (closerNodes.length > 0) peer.ipv4Array.preencode(state, closerNodes)
if (error > 0) c.uint.preencode(state, error)
if (value) c.buffer.preencode(state, value)
state.buffer = Buffer.allocUnsafe(state.end)
state.buffer[state.start++] = RESPONSE_ID
state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (closerNodes.length > 0 ? 4 : 0) | (error > 0 ? 8 : 0) | (value ? 16 : 0)
c.uint16.encode(state, this.tid)
peer.ipv4.encode(state, this.from)
if (id) c.fixed32.encode(state, this._io.table.id)
if (token) c.fixed32.encode(state, this._io.token(this.to, 1))
if (closerNodes.length > 0) peer.ipv4Array.encode(state, closerNodes)
if (error > 0) c.uint.encode(state, error)
if (value) c.buffer.encode(state, value)
this.socket.send(state.buffer, 0, state.buffer.byteLength, this.from.port, this.from.host)
}
_encodeRequest () {
const id = this._io.ephemeral === false && this.socket === this._io.serverSocket
const state = { start: 0, end: 1 + 1 + 6 + 2, buffer: null } // (type | version) + flags + to + tid
if (id) state.end += 32
if (this.token) state.end += 32
c.string.preencode(state, this.command)
if (this.target) state.end += 32
if (this.value) c.buffer.preencode(state, this.value)
state.buffer = Buffer.allocUnsafe(state.end)
state.buffer[state.start++] = REQUEST_ID
state.buffer[state.start++] = (id ? 1 : 0) | (this.token ? 2 : 0) | (this.target ? 4 : 0) | (this.value ? 8 : 0)
c.uint16.encode(state, this.tid)
peer.ipv4.encode(state, this.to)
if (id) c.fixed32.encode(state, this._io.table.id)
if (this.token) c.fixed32.encode(state, this.token)
c.string.encode(state, this.command)
if (this.target) c.fixed32.encode(state, this.target)
if (this.value) c.buffer.encode(state, this.value)
return state.buffer
}
}
class CongestionWindow {
constructor (maxWindow) {
this._i = 0
this._total = 0
this._window = [0, 0, 0, 0]
this._maxWindow = maxWindow
}
isFull () {
return this._total >= 2 * this._maxWindow || this._window[this._i] >= this._maxWindow
}
recv () {
if (this._window[this._i] > 0) {
this._window[this._i]--
this._total--
}
}
send () {
this._total++
this._window[this._i]++
}
drain () {
this._i = (this._i + 1) & 3
this._total -= this._window[this._i]
this._window[this._i] = 0 // clear oldest
}
}
function noop () {}
function oncycle (req) {
req._timeout = null
req.oncycle(req)
if (req.sent === 3) {
req.destroy(TIMEOUT)
req._io.ontimeout(req)
} else {
req.send()
}
}
function decodeReply (from, state) {
const flags = c.uint.decode(state)
const tid = c.uint16.decode(state)
const to = peer.ipv4.decode(state)
const id = flags & 1 ? c.fixed32.decode(state) : null
const token = flags & 2 ? c.fixed32.decode(state) : null
const closerNodes = flags & 4 ? peer.ipv4Array.decode(state) : null
const error = flags & 8 ? c.uint.decode(state) : 0
const value = flags & 16 ? c.buffer.decode(state) : null
if (id !== null) from.id = validateId(id, from)
try {
return { tid, from, to, token, closerNodes, error, value }
} catch {
return null
}
}
function validateId (id, from) {
return peer.id(from.host, from.port, TMP).equals(id) ? id : null
}

107
lib/messages.js

@ -1,107 +0,0 @@
const cenc = require('compact-encoding')
const IPv4 = exports.IPv4 = {
preencode (state, ip) {
state.end += 4
},
encode (state, ip) { // TODO: move over fast parser from ./id.js
const nums = ip.split('.')
state.buffer[state.start++] = Number(nums[0]) || 0
state.buffer[state.start++] = Number(nums[1]) || 0
state.buffer[state.start++] = Number(nums[2]) || 0
state.buffer[state.start++] = Number(nums[3]) || 0
},
decode (state) {
if (state.end - state.start < 4) throw new Error('Out of bounds')
return state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++]
}
}
const peerIPv4 = exports.peerIPv4 = {
preencode (state, peer) {
state.end += 6
},
encode (state, peer) {
IPv4.encode(state, peer.host)
cenc.uint16.encode(state, peer.port)
},
decode (state) {
return {
host: IPv4.decode(state),
port: cenc.uint16.decode(state)
}
}
}
const peerIPv4Array = exports.peerIPv4Array = cenc.array(peerIPv4)
const IS_REQUEST = 0b0001
const HAS_ID = 0b0010
const HAS_TOKEN = 0b0100
const ROUTE_INFO = 0b1000 | IS_REQUEST
const HAS_TARGET = ROUTE_INFO | IS_REQUEST
const HAS_CLOSER_NODES = ROUTE_INFO ^ IS_REQUEST
exports.message = {
preencode (state, m) {
state.end += 1 // version
state.end += 1 // flags
state.end += 2 // tid
state.end += 6 // to
if (m.id) state.end += 32
if (m.token) state.end += 32
if (m.target) state.end += 32
if (m.closerNodes && m.closerNodes.length) peerIPv4Array.preencode(state, m.closerNodes)
if (m.command) cenc.string.preencode(state, m.command)
else cenc.uint.preencode(state, m.status)
cenc.buffer.preencode(state, m.value)
},
encode (state, m) {
const closerNodes = m.closerNodes || []
const flags = (m.id ? HAS_ID : 0) |
(m.token ? HAS_TOKEN : 0) |
(closerNodes.length ? HAS_CLOSER_NODES : 0) |
(m.target ? HAS_TARGET : 0) |
(m.command ? IS_REQUEST : 0)
state.buffer[state.start++] = 2
state.buffer[state.start++] = flags
cenc.uint16.encode(state, m.tid)
peerIPv4.encode(state, m.to)
if ((flags & HAS_ID) === HAS_ID) cenc.fixed32.encode(state, m.id)
if ((flags & HAS_TOKEN) === HAS_TOKEN) cenc.fixed32.encode(state, m.token)
if ((flags & ROUTE_INFO) === HAS_TARGET) cenc.fixed32.encode(state, m.target)
if ((flags & ROUTE_INFO) === HAS_CLOSER_NODES) peerIPv4Array.encode(state, closerNodes)
if ((flags & IS_REQUEST) === IS_REQUEST) cenc.string.encode(state, m.command)
if ((flags & IS_REQUEST) === 0) cenc.uint.encode(state, m.status)
cenc.buffer.encode(state, m.value)
},
decode (state) {
const version = state.buffer[state.start++]
if (version !== 2) {
throw new Error('Incompatible version')
}
const flags = cenc.uint.decode(state)
return {
version: 2,
tid: cenc.uint16.decode(state),
from: null, // populated in caller
to: peerIPv4.decode(state),
id: (flags & HAS_ID) === HAS_ID ? cenc.fixed32.decode(state) : null,
token: (flags & HAS_TOKEN) === HAS_TOKEN ? cenc.fixed32.decode(state) : null,
target: ((flags & ROUTE_INFO) === HAS_TARGET) ? cenc.fixed32.decode(state) : null,
closerNodes: ((flags & ROUTE_INFO) === HAS_CLOSER_NODES) ? peerIPv4Array.decode(state) : null,
command: ((flags & IS_REQUEST) === IS_REQUEST) ? cenc.string.decode(state) : null,
status: ((flags & IS_REQUEST) === 0) ? cenc.uint.decode(state) : 0,
value: cenc.buffer.decode(state)
}
}
}

117
lib/nat-analyzer.js

@ -1,117 +0,0 @@
// how far can the port median distance be?
const INCREMENTING_THRESHOLD = 200
class NatAnalyzer {
constructor (sampleSize) {
// sampleSize must be 2^n
this.samples = new Array(sampleSize)
this.length = 0
this.top = 0
}
sample (referrer) {
for (let i = 0; i < this.length; i++) {
const s = this.samples[i]
const r = s.referrer
if (r.port === referrer.port && r.host === referrer.host) return s
}
return null
}
add (addr, referrer) {
if (this.length < this.samples.length) this.length++
this.samples[this.top] = { port: addr.port, host: addr.host, dist: 0, referrer }
this.top = (this.top + 1) & (this.samples.length - 1)
}
analyze (minSamples = 3) {
if (this.length < minSamples) return { type: NatAnalyzer.UNKNOWN, host: null, port: 0 }
const samples = this.samples.slice(0, this.length)
const hosts = new Map()
let bestHost = null
let bestHits = 0
for (let i = 0; i < samples.length; i++) {
const host = samples[i].host
const hits = (hosts.get(host) || 0) + 1
hosts.set(host, hits)
if (hits > bestHits) {
bestHits = hits
bestHost = host
}
}
if (bestHits < (samples.length >> 1)) {
return { type: NatAnalyzer.UNKNOWN, host: null, port: 0 }
}
samples.sort(cmpPort)
let start = 0
let end = samples.length
let mid = samples[samples.length >> 1].port
// remove the 3 biggest outliers from the median if we have more than 6 samples
if (samples.length >= 6) {
for (let i = 0; i < 3; i++) {
const s = samples[start]
const e = samples[end - 1]
if (Math.abs(mid - s.port) < Math.abs(mid - e.port)) end--
else start++
}
}
const len = end - start
mid = samples[len >> 1].port
for (let i = 0; i < samples.length; i++) {
samples[i].dist = Math.abs(mid - samples[i].port)
}
// note that still sorts with the outliers which is why we just start=0, end=len-1 below
samples.sort(cmpDist)
mid = samples[len >> 1].dist
if (samples[0].dist === 0 && samples[len - 1].dist === 0) {
return {
type: NatAnalyzer.PORT_CONSISTENT,
host: bestHost,
port: samples[0].port
}
}
if (mid < INCREMENTING_THRESHOLD) {
return {
type: NatAnalyzer.PORT_INCREMENTING,
host: bestHost,
port: 0
}
}
return {
type: NatAnalyzer.PORT_RANDOMIZED,
host: bestHost,
port: 0
}
}
}
NatAnalyzer.UNKNOWN = Symbol.for('NAT_UNKNOWN')
NatAnalyzer.PORT_CONSISTENT = Symbol.for('NAT_PORT_CONSISTENT')
NatAnalyzer.PORT_INCREMENTING = Symbol.for('NAT_PORT_INCREMENTING')
NatAnalyzer.PORT_RANDOMIZED = Symbol.for('NAT_PORT_RANDOMIZED')
module.exports = NatAnalyzer
function cmpDist (a, b) {
return a.dist - b.dist
}
function cmpPort (a, b) {
return a.port - b.port
}

49
lib/peer.js

@ -0,0 +1,49 @@
const sodium = require('sodium-universal')
const c = require('compact-encoding')
const addr = Buffer.alloc(6)
let i = 0
const ipv4 = {
preencode (state, p) {
state.end += 6
},
encode (state, p) {
i = 0
state.buffer[state.start++] = num(p.host)
state.buffer[state.start++] = num(p.host)
state.buffer[state.start++] = num(p.host)
state.buffer[state.start++] = num(p.host)
state.buffer[state.start++] = p.port
state.buffer[state.start++] = p.port >>> 8
},
decode (state) {
if (state.end - state.start < 6) throw new Error('Out of bounds')
return {
id: null, // populated elsewhere
host: state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++],
port: state.buffer[state.start++] + 256 * state.buffer[state.start++]
}
}
}
module.exports = { id, ipv4, ipv4Array: c.array(ipv4) }
function num (ip) {
let n = 0
let c = 0
while (i < ip.length && (c = ip.charCodeAt(i++)) !== 46) n = n * 10 + (c - 48)
return n
}
function id (ip, port, out = Buffer.allocUnsafe(32)) {
i = 0
addr[0] = num(ip)
addr[1] = num(ip)
addr[2] = num(ip)
addr[3] = num(ip)
addr[4] = port
addr[5] = port >>> 8
sodium.crypto_generichash(out, addr)
return out
}

180
lib/query.js

@ -1,6 +1,8 @@
const { Readable } = require('streamx')
const race = require('./race')
const nodeId = require('./id')
const peer = require('./peer')
const DONE = []
const DOWN = []
module.exports = class Query extends Readable {
constructor (dht, target, command, value, opts = {}) {
@ -16,17 +18,20 @@ module.exports = class Query extends Readable {
this.concurrency = opts.concurrency || this.dht.concurrency
this.inflight = 0
this.map = opts.map || defaultMap
this.maxSlow = opts.maxSlow === 0 ? 0 : (opts.maxSlow || 5)
this.closestReplies = []
this._slow = 0
this._slowdown = false
this._seen = new Set()
this._seen = new Map()
this._pending = []
this._onresolve = this._onvisit.bind(this)
this._onreject = this._onerror.bind(this)
this._fromTable = false
this._commit = opts.commit === true ? autoCommit : (opts.commit || null)
this._commiting = false
this._ropts = { socket: (opts && opts.socket) || this.dht.rpc.socket, expectOk: false }
this._onvisitbound = this._onvisit.bind(this)
this._onerrorbound = this._onerror.bind(this)
this._oncyclebound = this._oncycle.bind(this)
const nodes = opts.nodes || opts.closestNodes
@ -34,7 +39,7 @@ module.exports = class Query extends Readable {
// add them reverse as we pop below
for (let i = nodes.length - 1; i >= 0; i--) {
const node = nodes[i]
this._addPending(node.id || null, node.host, node.port)
this._addPending({ id: node.id || peer.id(node.host, node.port), host: node.host, port: node.port }, null)
}
}
}
@ -43,13 +48,7 @@ module.exports = class Query extends Readable {
const nodes = new Array(this.closestReplies.length)
for (let i = 0; i < nodes.length; i++) {
const c = this.closestReplies[i]
nodes[i] = {
id: c.id,
host: c.from.host,
port: c.from.port
}
nodes[i] = this.closestReplies[i].from
}
return nodes
@ -84,7 +83,7 @@ module.exports = class Query extends Readable {
const closest = this.dht.table.closest(this.target, this.k - this._pending.length)
for (const node of closest) {
this._addPending(node.id, node.host, node.port)
this._addPending({ id: node.id, host: node.host, port: node.port }, null)
}
}
@ -94,22 +93,42 @@ module.exports = class Query extends Readable {
this.dht._resolveBootstrapNodes((bootstrapNodes) => {
for (const node of bootstrapNodes) {
this._addPending(node.id, node.host, node.port)
this._addPending(node, null)
}
cb(null)
})
}
_isCloser (id) {
return this.closestReplies.length < this.k || this._compare(id, this.closestReplies[this.closestReplies.length - 1].id) < 0
return this.closestReplies.length < this.k || this._compare(id, this.closestReplies[this.closestReplies.length - 1].from.id) < 0
}
_addPending (id, host, port) {
if (id && !this._isCloser(id)) return false
const addr = host + ':' + port
if (this._seen.has(addr)) return true
this._seen.add(addr)
this._pending.push({ id, host, port })
_addPending (node, ref) {
const addr = node.host + ':' + node.port
const refs = this._seen.get(addr)
const isCloser = this._isCloser(node.id)
if (refs === DONE) {
return isCloser
}
if (refs === DOWN) {
if (ref) this._downHint(ref, node)
return isCloser
}
if (refs) {
if (ref !== null) refs.push(ref)
return isCloser
}
if (!isCloser) {
return false
}
this._seen.set(addr, ref === null ? [] : [ref])
this._pending.push(node)
return true
}
@ -119,9 +138,9 @@ module.exports = class Query extends Readable {
}
_readMore () {
if (this.destroying) return
if (this.destroying || this._commiting) return
const concurrency = this._slowdown ? 3 : this.concurrency
const concurrency = (this._slowdown ? 3 : this.concurrency) + this._slow
while (this.inflight < concurrency && this._pending.length > 0) {
const next = this._pending.pop()
@ -135,11 +154,15 @@ module.exports = class Query extends Readable {
this._slowdown = true
}
if (this.inflight === 0 && this._pending.length === 0) {
if (this._pending.length > 0) return
// if no inflight OR all the queries we are waiting on are marked as slow (within our limits) and we have a full result.
if (this.inflight === 0 || (this._slow <= this.maxSlow && this._slow === this.inflight && this.closestReplies.length >= this.k)) {
// if more than 3/4 failed and we only used cached nodes, try again from the routing table
if (!this._fromTable && this.successes < this.k / 4) {
this._addFromTable()
return this._readMore()
this._readMore()
return
}
this._flush()
@ -147,41 +170,73 @@ module.exports = class Query extends Readable {
}
_flush () {
if (this._commiting) return
this._commiting = true
if (this._commit === null) {
this.push(null)
return
}
if (!this.closestReplies.length) {
const p = []
for (const m of this.closestReplies) p.push(this._commit(m, this.dht, this))
this._endAfterCommit(p)
}
_endAfterCommit (ps) {
if (!ps.length) {
this.destroy(new Error('Too few nodes responded'))
return
}
if (this._commiting) return
this._commiting = true
const self = this
const p = []
for (const m of this.closestReplies) p.push(this._commit(m, this.dht, this))
let pending = ps.length
let success = 0
race(p, 1, p.length)
.then(() => this.push(null), (err) => this.destroy(err))
}
for (const p of ps) p.then(ondone, onerror)
_onvisit (m) {
if (m.status === 0) this.successes++
else this.errors++
function ondone () {
success++
if (--pending === 0) self.push(null)
}
this.inflight--
function onerror (err) {
if (--pending > 0) return
if (success) self.push(null)
else self.destroy(err)
}
}
if (m.status === 0 && m.id !== null && this._isCloser(m.id)) {
this._pushClosest(m)
_dec (req) {
if (req.oncycle === noop) {
this._slow--
} else {
req.oncycle = noop
}
this.inflight--
}
_onvisit (m, req) {
this._dec(req)
const addr = req.to.host + ':' + req.to.port
this._seen.set(addr, DONE)
if (this._commiting) return
if (m.error === 0) this.successes++
else this.errors++
if (m.error === 0 && m.from.id !== null && this._isCloser(m.from.id)) this._pushClosest(m)
if (m.closerNodes !== null) {
for (const node of m.closerNodes) {
const id = nodeId(node.host, node.port)
if (id.equals(this.dht.table.id)) continue
if (!this._addPending(id, node.host, node.port)) break
node.id = peer.id(node.host, node.port)
if (node.id.equals(this.dht.table.id)) continue
// TODO: we could continue here instead of breaking to ensure that one of the nodes in the closer list
// is later marked as DOWN that we gossip that back
if (!this._addPending(node, m.from)) break
}
}
@ -189,7 +244,7 @@ module.exports = class Query extends Readable {
this._slowdown = false
}
if (m.status !== 0) {
if (m.error !== 0) {
this._readMore()
return
}
@ -200,17 +255,35 @@ module.exports = class Query extends Readable {
}
}
_onerror () {
_onerror (_, req) {
const addr = req.to.host + ':' + req.to.port
const refs = this._seen.get(addr)
this._seen.set(addr, DOWN)
for (const node of refs) this._downHint(node, req.to)
this._dec(req)
this.errors++
this.inflight--
this._readMore()
}
_oncycle (req) {
req.oncycle = noop
this._slow++
this._readMore()
}
_downHint (node, down) {
const state = { start: 0, end: 6, buffer: Buffer.allocUnsafe(6) }
peer.ipv4.encode(state, down)
this.dht._request(node, 'down_hint', null, state.buffer, noop, noop)
}
_pushClosest (m) {
this.closestReplies.push(m)
for (let i = this.closestReplies.length - 2; i >= 0; i--) {
const prev = this.closestReplies[i]
const cmp = this._compare(prev.id, m.id)
const cmp = this._compare(prev.from.id, m.from.id)
// if sorted, done!
if (cmp < 0) break
// if dup, splice it out (rare)
@ -234,18 +307,21 @@ module.exports = class Query extends Readable {
return 0
}
_visit (node) {
_visit (to) {
this.inflight++
this.dht.request(this.target, this.command, this.value, node, this._ropts)
.then(this._onresolve, this._onreject)
const req = this.dht._request(to, this.command, this.target, this.value, this._onvisitbound, this._onerrorbound)
req.oncycle = this._oncyclebound
}
}
function autoCommit (reply, dht, query) {
if (!reply.token) return Promise.reject(new Error('No token received for closest node'))
return dht.request(query.target, query.command, query.value, reply.from, { token: reply.token })
return dht.request({ token: reply.token, target: query.target, command: query.command, value: query.value }, reply.from)
}
function defaultMap (m) {
return m
}
function noop () {}

16
lib/race.js

@ -1,16 +0,0 @@
module.exports = async function race (p, min = 1, max = p.length) {
let errors = 0
const results = []
// avoid unhandled rejections after early return/throw
for (const promise of p) promise.catch(() => {})
for (let i = 0; i < p.length; i++) {
try {
const res = await p[i]
if (results.length < max) results.push(res)
if (results.length >= max) return results
if (results.length + errors === p.length) return results
} catch {
if ((p.length - ++errors) < min) throw new Error('Too many requests failed')
}
}
}

255
lib/rpc.js

@ -1,255 +0,0 @@
const dgram = require('dgram')
const { message } = require('./messages')
module.exports = class RPC {
constructor (opts = {}) {
this._pendingSends = []
this._tid = (Math.random() * 65536) | 0
this._drainInterval = null
this._tick = 0
this._w = 0
this._win = [0, 0, 0, 0]
this._bind = opts.bind || 0
this._bound = false
this._binding = null
this.maxWindow = opts.maxWindow || 80 // ~100 per second burst, ~80 per second avg
this.maxRetries = 3
this.destroyed = false
this.inflight = []
this.onrequest = opts.onrequest || noop
this.onresponse = opts.onresponse || noop
this.onwarning = opts.onwarning || noop
this.socket = opts.socket || dgram.createSocket('udp4')
this.socket.on('message', this.onmessage.bind(this, true))
}
get inflightRequests () {
return this.inflight.length
}
send (m, socket = this.socket) {
const state = { start: 0, end: 0, buffer: null }
message.preencode(state, m)
state.buffer = Buffer.allocUnsafe(state.end)
message.encode(state, m)
this._send(socket, state.buffer, m.to)
}
reply (req, reply, socket = this.socket) {
reply.tid = req.tid
reply.to = req.from
this.send(reply, socket)
}
address () {
return this.socket.address()
}
bind (port = this._bind) {
if (this._binding) return this._binding
const self = this
this._binding = new Promise((resolve, reject) => {
const s = this.socket
s.bind(port)
s.on('listening', onlistening)
s.on('error', onerror)
function onlistening () {
self._bound = true
s.removeListener('listening', onlistening)
s.removeListener('error', onerror)
resolve(s.address().port)
}
function onerror (err) {
// retry on any port if preferred port is unavail
if (port === self._bind && port !== 0) {
port = 0
s.bind(0)
return
}
s.removeListener('listening', onlistening)
s.removeListener('error', onerror)
reject(err)
}
})
return this._binding
}
destroy () {
if (this.destroyed) return
this.unwrap(true)
this.socket.close()
}
unwrap (closing = false) {
if (this.destroyed) return
this.destroyed = true
clearInterval(this._drainInterval)
this.socket.removeAllListeners()
for (const req of this.inflight) {
req.reject(new Error('RPC socket destroyed'))
}
this.inflight = []
return this.socket
}
async request (m, opts) {
if (this.destroyed) throw new Error('RPC socket destroyed')
const socket = (opts && opts.socket) || this.socket
if (!this._bound && socket === this.socket) await this.bind()
if (this._drainInterval === null) {
this._drainInterval = setInterval(this._drain.bind(this), 750)
if (this._drainInterval.unref) this._drainInterval.unref()
}
m.tid = this._tid++
if (this._tid === 65536) this._tid = 0
const state = { start: 0, end: 0, buffer: null }
message.preencode(state, m)
state.buffer = Buffer.allocUnsafe(state.end)
message.encode(state, m)
return new Promise((resolve, reject) => {
const total = this._win[0] + this._win[1] + this._win[2] + this._win[3]
const req = {
socket,
timeout: 2,
expectOk: !!(opts && opts.expectOk !== false),
tries: (opts && opts.retry === false) ? this.maxRetries : 0,
tid: m.tid,
buffer: state.buffer,
to: m.to,
resolve,
reject
}
this.inflight.push(req)
if (total < 2 * this.maxWindow && this._win[this._w] < this.maxWindow) {
this._win[this._w]++
req.tries++
this._send(req.socket, req.buffer, req.to)
}
})
}
onmessage (sample, buffer, rinfo) {
const from = { host: rinfo.address, port: rinfo.port }
if (!from.port) return
if (buffer.byteLength <= 1) return
const state = { start: 0, end: buffer.byteLength, buffer }
let m = null
try {
m = message.decode(state)
} catch (err) {
this.onwarning(err)
return
}
m.from = from
if (m.command !== null) { // request
if (this.onrequest === noop) return
this.onrequest(m, sample)
return
}
const req = this._dequeue(m.tid)
if (req === null) return
if (m.id && (req.to.port !== from.port || req.to.host !== from.host)) m.id = null
// decrement the inflight window as this is an "ack"
if (this._win[this._w] > 0) this._win[this._w]--
this.onresponse(m, sample)
if (m.status === 0 || req.expectOk === false) {
req.resolve(m)
} else {
req.reject(createStatusError(m.status))
}
}
_send (socket, buf, addr) {
if (this.destroyed) return
socket.send(buf, 0, buf.byteLength, addr.port, addr.host)
}
_dequeue (tid) {
for (let i = 0; i < this.inflight.length; i++) {
const req = this.inflight[i]
if (req.tid === tid) {
if (i === this.inflight.length - 1) this.inflight.pop()
else this.inflight[i] = this.inflight.pop()
return req
}
}
return null
}
_drain () {
let total = this._win[0] + this._win[1] + this._win[2] + this._win[3]
for (let i = 0; i < this.inflight.length; i++) {
const req = this.inflight[i]
if (req.tries > 0 && --req.timeout >= 0) continue
req.timeout = 2
if (req.tries++ > this.maxRetries) {
if (i === this.inflight.length - 1) this.inflight.pop()
else this.inflight[i] = this.inflight.pop()
req.reject(createTimeoutError())
continue
}
if (total >= 2 * this.maxWindow || this._win[this._w] >= this.maxWindow) {
req.tries--
continue
}
total++
this._win[this._w]++
this._send(req.socket, req.buffer, req.to)
}
this._w = (this._w + 1) & 3
this._win[this._w] = 0 // clear oldest
}
}
function createTimeoutError () {
const err = new Error('Request timed out')
err.status = 0
return err
}
function createStatusError (status) {
const err = new Error('Request failed with status ' + status)
err.status = status
return err
}
function noop () {}

1
package.json

@ -7,6 +7,7 @@
"compact-encoding": "^2.1.0",
"fast-fifo": "^1.0.0",
"kademlia-routing-table": "^1.0.0",
"nat-sampler": "^1.0.1",
"sodium-universal": "^3.0.4",
"streamx": "^2.10.3",
"time-ordered-set": "^1.0.2"

77
test.js

@ -12,14 +12,15 @@ tape('make bigger swarm', async function (t) {
const swarm = await makeSwarm(500)
const targetNode = swarm[25]
const target = targetNode.id
let q = swarm[499].query(targetNode.id, 'find_node', null)
let q = swarm[499].query({ command: 'find_node', target })
let messages = 0
let found = false
for await (const data of q) {
messages++
if (data.id && data.id.equals(targetNode.id)) {
if (data.from.id && data.from.id.equals(target)) {
found = true
break
}
@ -27,13 +28,13 @@ tape('make bigger swarm', async function (t) {
t.ok(found, 'found target in ' + messages + ' message(s)')
q = swarm[490].query(targetNode.id, 'find_node', null, { nodes: q.closestNodes })
q = swarm[490].query({ command: 'find_node', target }, { nodes: q.closestNodes })
messages = 0
found = false
for await (const data of q) {
messages++
if (data.id && data.id.equals(targetNode.id)) {
if (data.from.id && data.from.id.equals(target)) {
found = true
break
}
@ -41,37 +42,15 @@ tape('make bigger swarm', async function (t) {
t.ok(found, 'found target again in ' + messages + ' message(s)')
const { type, host, port } = swarm[490].remoteAddress()
const { firewalled, host, port } = swarm[490]
t.same(type, DHT.NAT_OPEN)
t.same(firewalled, false)
t.same(port, swarm[490].address().port)
t.ok(host)
destroy(swarm)
})
tape('nat sample promise', async function (t) {
const swarm = await makeSwarm(5)
const node = new DHT({
bootstrap: [{ host: '127.0.0.1', port: swarm[0].address().port }]
})
let ready = false
node.ready().then(() => {
ready = true
})
await node.sampledNAT()
t.ok(node._nat.length >= 3, 'min 3 samples')
t.notOk(ready, 'before ready')
await node.ready()
t.ok(ready, 'after ready')
node.destroy()
destroy(swarm)
})
tape('commit after query', async function (t) {
const swarm = await makeSwarm(100)
@ -82,16 +61,16 @@ tape('commit after query', async function (t) {
if (req.command === 'before') {
return req.reply(null)
}
if (req.command === 'after' && req.commit) {
if (req.command === 'after' && req.token) {
commits++
return req.reply(null)
}
})
}
const q = swarm[42].query(swarm[0].table.id, 'before', null, {
const q = swarm[42].query({ command: 'before', target: swarm[0].table.id }, {
commit (m, dht, query) {
return dht.request(query.target, 'after', null, m.from, { token: m.token })
return dht.request({ command: 'after', target: query.target, token: m.token }, m.from)
}
})
@ -106,11 +85,11 @@ tape('map query stream', async function (t) {
const swarm = await makeSwarm(10)
const expected = []
const q = swarm[0].query(swarm[0].table.id, 'find_node', null, {
const q = swarm[0].query({ command: 'find_node', target: swarm[0].table.id }, {
map (data) {
if (expected.length > 3) return null
expected.push(data.id)
return data.id
expected.push(data.from.id)
return data.from.id
}
})
@ -119,7 +98,9 @@ tape('map query stream', async function (t) {
await q.finished()
t.ok(expected.length > 0)
t.same(buf, expected)
destroy(swarm)
})
@ -134,10 +115,10 @@ tape('timeouts', async function (t) {
}
})
const q = a.query(Buffer.alloc(32), 'nope')
const q = a.query({ command: 'nope', target: Buffer.alloc(32) })
await q.finished()
t.same(tries, 4)
t.same(tries, 3)
bootstrap.destroy()
a.destroy()
@ -146,20 +127,24 @@ tape('timeouts', async function (t) {
tape('shorthand commit', async function (t) {
const swarm = await makeSwarm(40)
let tokens = 0
let notTokens = 0
for (const node of swarm) {
node.on('request', function (req) {
if (req.commit) tokens++
if (req.token) tokens++
else notTokens++
req.reply(null)
})
}
const q = swarm[0].query(Buffer.alloc(32), 'nope', null, { commit: true })
const q = swarm[0].query({ command: 'hello', target: Buffer.alloc(32) }, { commit: true })
await q.finished()
t.same(tokens, 20)
t.ok(notTokens >= tokens)
destroy(swarm)
})
@ -192,7 +177,7 @@ tape('timeouts when commiting', async function (t) {
}
})
const q = a.query(Buffer.alloc(32), 'nope', null, { commit: true })
const q = a.query({ command: 'nope', target: Buffer.alloc(32) }, { commit: true })
let error = null
try {
@ -202,7 +187,7 @@ tape('timeouts when commiting', async function (t) {
}
t.ok(error, 'commit should fail')
t.same(tries, 4)
t.same(tries, 3)
bootstrap.destroy()
a.destroy()
@ -237,10 +222,10 @@ tape('addNode / nodes option', async function (t) {
const bNodes = b.toArray()
t.deepEqual(bNodes, [{ host: '127.0.0.1', port: a.address().port }])
t.same(bNodes, [{ host: '127.0.0.1', port: a.address().port }])
const responses = []
for await (const data of b.query(a.id, 'hello')) {
for await (const data of b.query({ command: 'hello', target: a.id })) {
responses.push(data)
}
@ -249,7 +234,7 @@ tape('addNode / nodes option', async function (t) {
const aNodes = a.toArray()
t.deepEqual(aNodes, [{ host: '127.0.0.1', port: b.address().port }])
t.same(aNodes, [{ host: '127.0.0.1', port: b.address().port }])
a.destroy()
b.destroy()
@ -259,7 +244,7 @@ tape('addNode / nodes option', async function (t) {
tape('set bind', async function (t) {
const port = await freePort()
const a = new DHT({ bind: port })
const a = new DHT({ bind: port, firewalled: false })
await a.ready()
t.same(a.address().port, port, 'bound to explicit port')
@ -290,8 +275,8 @@ function freePort () {
}
async function makeSwarm (n) {
const node = new DHT()
await node.bind(0)
const node = DHT.bootstrapper()
await node.ready()
const all = [node]
const bootstrap = ['localhost:' + node.address().port]
while (all.length < n) {

Loading…
Cancel
Save