Browse Source

add rpc backpressure through reqs-sent/s and rework lots of stuff. start firewall analysis impl

session-estimator
Mathias Buus 4 years ago
parent
commit
21c6d5b186
  1. 42
      index.js
  2. 153
      lib/query.js
  3. 42
      lib/rpc.js
  4. 5
      package.json
  5. 61
      test.js

42
index.js

@ -43,7 +43,7 @@ class DHT extends EventEmitter {
const id = opts.id || randomBytes(32)
this.bootstrapNodes = (opts.bootstrapNodes || []).map(parseNode)
this.bootstrapNodes = opts.bootstrap === false ? [] : (opts.bootstrap || []).map(parseNode)
this.nodes = new TOS()
this.table = new Table(id)
this.rpc = new RPC({
@ -68,6 +68,10 @@ class DHT extends EventEmitter {
this.table.on('row', (row) => row.on('full', (node) => this._onfullrow(node, row)))
}
get id () {
return this.table.id
}
static createRPCSocket (opts) {
return new RPC(opts)
}
@ -168,6 +172,32 @@ class DHT extends EventEmitter {
this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null)
}
_tally (onlyIp) {
const sum = new Map()
let result = null
let node = this.nodes.latest
let cnt = 0
let good = 0
for (; node && cnt < 10; node = node.prev) {
const to = node.to.host + ':' + (onlyIp ? 0 : node.to.port)
const hits = 1 + (sum.get(to) || 0)
if (hits > good) {
good = hits
result = node.to
}
sum.set(to, hits)
cnt++
}
// We want at least 3 samples all with the same ip:port from
// different remotes (the to field) to be consider it consistent
// If we get >=3 samples with conflicting info we are not (or under attack) (Subject for tweaking)
const bad = cnt - good
return bad < 3 && good >= 3 ? result : null
}
_pingSome () {
let cnt = this.rpc.inflightRequests > 2 ? 3 : 5
let oldest = this.nodes.oldest
@ -294,6 +324,7 @@ class DHT extends EventEmitter {
if (oldNode) {
if (oldNode.port === m.from.port && oldNode.host === m.from.host) {
// refresh it
oldNode.to = m.to
oldNode.seen = this._tick
this.nodes.add(oldNode)
}
@ -302,9 +333,10 @@ class DHT extends EventEmitter {
this._addNode({
id: m.nodeId,
token: null,
port: m.from.port,
host: m.from.host,
token: null,
to: m.to,
added: this._tick,
seen: this._tick,
prev: null,
@ -345,6 +377,10 @@ class DHT extends EventEmitter {
return this.rpc.bind(...args)
}
address () {
return this.rpc.address()
}
_reply (rpc, tid, target, status, value, token, to) {
const closerNodes = target ? this.table.closest(target) : null
const persistent = !this.ephemeral && rpc === this.rpc
@ -379,7 +415,7 @@ function parseNode (s) {
return {
id: id ? Buffer.from(id.slice(0, -1), 'hex') : null,
host,
port
port: Number(port.slice(1))
}
}

153
lib/query.js

@ -1,4 +1,3 @@
const Table = require('kademlia-routing-table')
const { Readable } = require('streamx')
module.exports = class Query extends Readable {
@ -6,25 +5,33 @@ module.exports = class Query extends Readable {
super()
this.dht = dht
this.table = opts.table || new Table(target, { k: 20 })
this.k = this.dht.table.k
this.target = target
this.command = command
this.value = value
this.errors = 0
this.successes = 0
this.concurrency = opts.concurrency || 16
this.concurrency = opts.concurrency || this.k
this.inflight = 0
this.map = opts.map || defaultMap
this.closest = []
this._slowdown = false
this._seen = new Set()
this._pending = []
this._onresolve = this._onvisit.bind(this)
this._onreject = this._onerror.bind(this)
}
this._fromTable = false
get target () {
return this.table.id
}
const nodes = opts.nodes || opts.closest
closest () {
return this.table.closest(this.table.id)
if (nodes) {
// add them reverse as we pop below
for (let i = nodes.length - 1; i >= 0; i--) {
const node = nodes[i]
this._addPending(node.id, node.host, node.port)
}
}
}
finished () {
@ -51,7 +58,7 @@ module.exports = class Query extends Readable {
async commit (command = this.command, value = this.value, opts) {
if (typeof command === 'object' && command) return this.commit(undefined, undefined, command)
return this.dht.requestAll(this.table.id, command, value, this.closest(), opts)
return this.dht.requestAll(this.target, command, value, this.closest, opts)
}
async toArray () {
@ -61,45 +68,41 @@ module.exports = class Query extends Readable {
return all
}
_open (cb) {
let cnt = 0
// we need to do this in case of table reuse
for (const node of this.table.closest(this.table.id)) {
node.visited = false
cnt++
}
_addFromTable () {
if (this._pending.length >= this.k) return
this._fromTable = true
const closest = this.dht.table.closest(this.table.id)
const closest = this.dht.table.closest(this.target, this.k - this._pending.length)
for (const node of closest) {
cnt++
this.table.add({
visited: false,
id: node.id,
token: null,
port: node.port,
host: node.host
})
this._addPending(node.id, node.host, node.port)
}
}
if (cnt >= this.concurrency) return cb(null)
_open (cb) {
this._addFromTable()
if (this._pending.length >= this.k) return cb(null)
this.dht._resolveBootstrapNodes((bootstrapNodes) => {
for (const node of bootstrapNodes) {
this._visit({
visited: false,
id: node.id,
token: null,
port: node.port,
host: node.host
})
this._addPending(node.id, node.host, node.port)
}
cb(null)
})
}
_isCloser (id) {
return this.closest.length < this.k || this._compare(id, this.closest[this.closest.length - 1].id) < 0
}
_addPending (id, host, port) {
if (id && !this._isCloser(id)) return
const addr = host + ':' + port
if (this._seen.has(addr)) return
this._seen.add(addr)
this._pending.push({ id, host, port })
}
_read (cb) {
this._readMore()
cb(null)
@ -108,15 +111,27 @@ module.exports = class Query extends Readable {
_readMore () {
if (this.destroying) return
const closest = this.table.closest(this.table.id)
const concurrency = this._slowdown ? 3 : this.concurrency
for (const node of closest) {
if (node.visited) continue
if (this.inflight >= this.concurrency) return
this._visit(node)
while (this.inflight < concurrency && this._pending.length > 0) {
const next = this._pending.pop()
if (next && next.id && !this._isCloser(next.id)) continue
this._visit(next)
}
if (this.inflight === 0) {
// if reusing closest nodes, slow down after the first readMore tick to allow
// the closests node a chance to reply before going broad to question more
if (!this._fromTable && this.successes === 0 && this.errors === 0) {
this._slowdown = true
}
if (this.inflight === 0 && this._pending.length === 0) {
// if more than 3/4 failed and we only used cached nodes, try again from the routing table
if (!this._fromTable && this.successes < this.k / 4) {
this._addFromTable()
return this._readMore()
}
this.push(null)
}
}
@ -125,33 +140,59 @@ module.exports = class Query extends Readable {
this.successes++
this.inflight--
if (m.nodeId !== null) {
this.table.add({
visited: true,
if (m.nodeId !== null && this._isCloser(m.nodeId)) {
const node = {
id: m.nodeId,
token: m.token,
port: m.from.port,
host: m.from.host
})
}
this._pushClosest(node)
}
if (m.closerNodes !== null) {
for (const node of m.closerNodes) {
if (node.id.equals(this.dht.table.id)) continue
if (this.table.get(node.id)) continue
this.table.add({
visited: false,
id: node.id,
token: null,
port: node.port,
host: node.host
})
this._addPending(node.id, node.host, node.port)
}
}
if (!this._fromTable && this.successes + this.errors >= this.concurrency) {
this._slowdown = false
}
if (this.push(this.map(m)) !== false) this._readMore()
}
_pushClosest (node) {
this.closest.push(node)
for (let i = this.closest.length - 2; i >= 0; i--) {
const prev = this.closest[i]
const cmp = this._compare(prev.id, node.id)
// if sorted, done!
if (cmp < 0) break
// if dup, splice it out (rare)
if (cmp === 0) {
this.closest.splice(i + 1, 1)
break
}
// swap and continue down
this.closest[i + 1] = prev
this.closest[i] = node
}
if (this.closest.length > this.k) this.closest.pop()
}
_compare (a, b) {
for (let i = 0; i < a.length; i++) {
if (a[i] === b[i]) continue
const t = this.target[i]
return (t ^ a[i]) - (t ^ b[i])
}
return 0
}
_onerror () {
this.errors++
this.inflight--
@ -159,10 +200,8 @@ module.exports = class Query extends Readable {
}
_visit (node) {
node.visited = true
this.inflight++
this.dht.request(this.table.id, this.command, this.value, node)
this.dht.request(this.target, this.command, this.value, node)
.then(this._onresolve, this._onreject)
}
}

42
lib/rpc.js

@ -12,7 +12,11 @@ module.exports = class RPC {
this._onflush = onflush.bind(this)
this._tid = (Math.random() * 65536) | 0
this._drainInterval = null
this._tick = 0
this._w = 0
this._win = [0, 0, 0, 0]
this.maxWindow = 80 // ~100 per second burst, ~80 per second avg
this.maxRetries = 3
this.destroyed = false
this.inflight = []
@ -118,7 +122,7 @@ module.exports = class RPC {
if (this.destroyed) return Promise.reject(new Error('RPC socket destroyed'))
if (this._drainInterval === null) {
this._drainInterval = setInterval(this._drain.bind(this), 1500)
this._drainInterval = setInterval(this._drain.bind(this), 750)
if (this._drainInterval.unref) this._drainInterval.unref()
}
@ -132,16 +136,24 @@ module.exports = class RPC {
message.encode(state, m)
return new Promise((resolve, reject) => {
this.inflight.push({
const total = this._win[0] + this._win[1] + this._win[2] + this._win[3]
const req = {
timeout: 2,
tries: (opts && opts.retry === false) ? this.maxRetries : 0,
lastTry: 0,
tid: m.tid,
buffer: state.buffer,
to: m.to,
resolve,
reject
})
this._drain()
}
this.inflight.push(req)
if (total < 2 * this.maxWindow && this._win[this._w] < this.maxWindow) {
this._win[this._w]++
req.tries++
this._send(req.buffer, DEFAULT_TTL, req.to)
}
})
}
@ -171,7 +183,6 @@ module.exports = class RPC {
try {
m = message.decode(state)
} catch (err) {
console.log(err)
this.onwarning(err)
return
}
@ -231,16 +242,13 @@ module.exports = class RPC {
}
_drain () {
const now = Date.now()
let total = this._win[0] + this._win[1] + this._win[2] + this._win[3]
for (let i = 0; i < this.inflight.length; i++) {
const req = this.inflight[i]
if (now - req.lastTry < 3000) {
continue
}
req.lastTry = now
if (--req.timeout >= 0) continue
req.timeout = 2
if (req.tries++ > this.maxRetries) {
if (i === this.inflight.length - 1) this.inflight.pop()
@ -249,8 +257,18 @@ module.exports = class RPC {
continue
}
if (total >= 2 * this.maxWindow || this._win[this._w] >= this.maxWindow) {
req.tries--
continue
}
total++
this._win[this._w]++
this._send(req.buffer, DEFAULT_TTL, req.to)
}
this._w = (this._w + 1) & 3
this._win[this._w] = 0 // clear oldest
}
}

5
package.json

@ -12,10 +12,11 @@
"time-ordered-set": "^1.0.2"
},
"devDependencies": {
"standard": "^16.0.3"
"standard": "^16.0.3",
"tape": "^5.2.2"
},
"scripts": {
"test": "standard"
"test": "standard && tape test.js"
},
"repository": {
"type": "git",

61
test.js

@ -0,0 +1,61 @@
const tape = require('tape')
const DHT = require('./')
tape('make tiny swarm', async function (t) {
const swarm = await makeSwarm(2)
t.pass('could make swarm')
destroy(swarm)
})
tape('make bigger swarm', async function (t) {
const swarm = await makeSwarm(500)
const targetNode = swarm[25]
let q = swarm[499].query(targetNode.id, 'find_node', null)
let messages = 0
let found = false
for await (const data of q) {
messages++
if (data.nodeId.equals(targetNode.id)) {
found = true
break
}
}
t.ok(found, 'found target in ' + messages + ' message(s)')
q = swarm[490].query(targetNode.id, 'find_node', null, { closest: q.closest })
messages = 0
found = false
for await (const data of q) {
messages++
if (data.nodeId.equals(targetNode.id)) {
found = true
break
}
}
t.ok(found, 'found target again in ' + messages + ' message(s)')
destroy(swarm)
})
function destroy (list) {
for (const node of list) node.destroy()
}
async function makeSwarm (n) {
const node = new DHT()
await node.bind(0)
const all = [node]
const bootstrap = ['localhost:' + node.address().port]
while (all.length < n) {
const node = new DHT({ bootstrap })
await node.ready()
all.push(node)
}
return all
}
Loading…
Cancel
Save