Browse Source

more fixes

session-estimator
Mathias Buus 4 years ago
parent
commit
cbb2feec27
  1. 48
      index.js
  2. 4
      lib/messages.js
  3. 57
      lib/query.js
  4. 10
      lib/rpc.js
  5. 12
      test.js

48
index.js

@ -42,11 +42,11 @@ class Request {
} }
error (code) { error (code) {
this.dht._reply(this.rpc, this.tid, this.target, code, null, false, this.from) return this.dht._reply(this.rpc, this.tid, this.target, code, null, false, this.from)
} }
reply (value, token = true) { reply (value, token = true) {
this.dht._reply(this.rpc, this.tid, this.target, 0, value, token, this.from) return this.dht._reply(this.rpc, this.tid, this.target, 0, value, token, this.from)
} }
} }
@ -88,7 +88,7 @@ class DHT extends EventEmitter {
this.table.on('row', (row) => row.on('full', (node) => this._onfullrow(node, row))) this.table.on('row', (row) => row.on('full', (node) => this._onfullrow(node, row)))
} }
get id () { get nodeId () {
return this.table.id return this.table.id
} }
@ -137,6 +137,17 @@ class DHT extends EventEmitter {
return race(p, min, opts.max) return race(p, min, opts.max)
} }
// TODO: make this more smart - ie don't retry the first one etc etc
async requestAny (target, command, value, nodes, opts) {
for (const node of nodes) {
try {
return await this.request(target, command, value, node, opts)
} catch {}
}
throw new Error('All requests failed')
}
destroy () { destroy () {
this.rpc.destroy() this.rpc.destroy()
clearInterval(this._tickInterval) clearInterval(this._tickInterval)
@ -173,7 +184,7 @@ class DHT extends EventEmitter {
refresh () { refresh () {
const node = this.table.random() const node = this.table.random()
this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null) this._backgroundQuery(node ? node.nodeId : this.table.id, 'find_node', null)
} }
_pingSomeBootstrapNodes () { _pingSomeBootstrapNodes () {
@ -185,7 +196,7 @@ class DHT extends EventEmitter {
this._pingBootstrapTicks = REFRESH_TICKS this._pingBootstrapTicks = REFRESH_TICKS
const nodes = this.table.closest(PING_BOOTSTRAP, 1) const nodes = this.table.closest(PING_BOOTSTRAP, 1)
if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].id) > 0) { if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].nodeId) > 0) {
return return
} }
@ -194,7 +205,7 @@ class DHT extends EventEmitter {
q.on('close', () => { q.on('close', () => {
if (q.closest.length === 0) return if (q.closest.length === 0) return
if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].id) > 0) { if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].nodeId) > 0) {
return return
} }
@ -309,7 +320,7 @@ class DHT extends EventEmitter {
_repingMaybe () { _repingMaybe () {
while (this._repinging < 3 && this._reping.isEmpty() === false) { while (this._repinging < 3 && this._reping.isEmpty() === false) {
const { newNode, row } = this._reping.shift() const { newNode, row } = this._reping.shift()
if (this.table.get(newNode.id)) continue if (this.table.get(newNode.nodeId)) continue
let oldest = null let oldest = null
for (const node of row.nodes) { for (const node of row.nodes) {
@ -331,7 +342,7 @@ class DHT extends EventEmitter {
this.ping(oldNode).then(onsuccess, onswap) this.ping(oldNode).then(onsuccess, onswap)
function onsuccess (m) { function onsuccess (m) {
if (m.nodeId === null || !m.nodeId.equals(oldNode.id)) return onswap() if (m.nodeId === null || !m.nodeId.equals(oldNode.nodeId)) return onswap()
self._repinging-- self._repinging--
self._repingMaybe() self._repingMaybe()
} }
@ -352,14 +363,14 @@ class DHT extends EventEmitter {
for (const node of this.bootstrapNodes) { for (const node of this.bootstrapNodes) {
dns.lookup(node.host, (_, host) => { dns.lookup(node.host, (_, host) => {
if (host) nodes.push({ id: node.id || null, host, port: node.port }) if (host) nodes.push({ nodeId: node.nodeId || null, host, port: node.port })
if (--missing === 0) done(nodes) if (--missing === 0) done(nodes)
}) })
} }
} }
_addNode (node) { _addNode (node) {
if (this.nodes.has(node) || node.id.equals(this.table.id)) return if (this.nodes.has(node) || node.nodeId.equals(this.table.id)) return
node.added = node.seen = this._tick node.added = node.seen = this._tick
@ -369,14 +380,14 @@ class DHT extends EventEmitter {
} }
_maybeRemoveNode (node, expectedId) { _maybeRemoveNode (node, expectedId) {
if (expectedId !== null && expectedId.equals(node.id)) return if (expectedId !== null && expectedId.equals(node.nodeId)) return
this._removeNode(node) this._removeNode(node)
} }
_removeNode (node) { _removeNode (node) {
if (!this.nodes.has(node)) return if (!this.nodes.has(node)) return
this.table.remove(node.id) this.table.remove(node.nodeId)
this.nodes.remove(node) this.nodes.remove(node)
this.emit('remove-node', node) this.emit('remove-node', node)
@ -398,10 +409,11 @@ class DHT extends EventEmitter {
this._nat.add(m.to) this._nat.add(m.to)
this._addNode({ this._addNode({
id: m.nodeId, nodeId: m.nodeId,
token: null, token: null,
port: m.from.port, port: m.from.port,
host: m.from.host, host: m.from.host,
id: m.nodeId, // alias for nodeId as the routing table expects that
added: this._tick, added: this._tick,
seen: this._tick, seen: this._tick,
prev: null, prev: null,
@ -459,8 +471,7 @@ class DHT extends EventEmitter {
_reply (rpc, tid, target, status, value, token, to) { _reply (rpc, tid, target, status, value, token, to) {
const closerNodes = target ? this.table.closest(target) : null const closerNodes = target ? this.table.closest(target) : null
const persistent = !this.ephemeral && rpc === this.rpc const persistent = !this.ephemeral && rpc === this.rpc
const reply = {
rpc.send({
version: 1, version: 1,
tid, tid,
from: null, from: null,
@ -472,7 +483,10 @@ class DHT extends EventEmitter {
command: null, command: null,
status, status,
value value
}) }
rpc.send(reply)
return reply
} }
} }
@ -492,7 +506,7 @@ function parseNode (s) {
if (!port) throw new Error('Node format is id@?host:port') if (!port) throw new Error('Node format is id@?host:port')
return { return {
id: id ? Buffer.from(id.slice(0, -1), 'hex') : null, nodeId: id ? Buffer.from(id.slice(0, -1), 'hex') : null,
host, host,
port: Number(port.slice(1)) port: Number(port.slice(1))
} }

4
lib/messages.js

@ -38,13 +38,13 @@ const dhtPeerIPv4 = exports.dhtPeerIPv4 = {
state.end += 6 + 32 state.end += 6 + 32
}, },
encode (state, peer) { encode (state, peer) {
cenc.fixed32.encode(state, peer.id) cenc.fixed32.encode(state, peer.nodeId)
IPv4.encode(state, peer.host) IPv4.encode(state, peer.host)
cenc.uint16.encode(state, peer.port) cenc.uint16.encode(state, peer.port)
}, },
decode (state) { decode (state) {
return { return {
id: cenc.fixed32.decode(state), nodeId: cenc.fixed32.decode(state),
host: IPv4.decode(state), host: IPv4.decode(state),
port: cenc.uint16.decode(state) port: cenc.uint16.decode(state)
} }

57
lib/query.js

@ -15,8 +15,8 @@ module.exports = class Query extends Readable {
this.concurrency = opts.concurrency || this.k this.concurrency = opts.concurrency || this.k
this.inflight = 0 this.inflight = 0
this.map = opts.map || defaultMap this.map = opts.map || defaultMap
this.closest = []
this._closestReplies = []
this._slowdown = false this._slowdown = false
this._seen = new Set() this._seen = new Set()
this._pending = [] this._pending = []
@ -32,11 +32,15 @@ module.exports = class Query extends Readable {
// add them reverse as we pop below // add them reverse as we pop below
for (let i = nodes.length - 1; i >= 0; i--) { for (let i = nodes.length - 1; i >= 0; i--) {
const node = nodes[i] const node = nodes[i]
this._addPending(node.id, node.host, node.port) this._addPending(node.nodeId, node.host, node.port)
} }
} }
} }
get closest () {
return this._closestReplies.map(r => ({ nodeId: r.nodeId, host: r.from.host, port: r.from.port }))
}
finished () { finished () {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const self = this const self = this
@ -61,7 +65,7 @@ module.exports = class Query extends Readable {
async commit (command = this.command, value = this.value, opts) { async commit (command = this.command, value = this.value, opts) {
if (typeof command === 'object' && command) return this.commit(undefined, undefined, command) if (typeof command === 'object' && command) return this.commit(undefined, undefined, command)
return this.dht.requestAll(this.target, command, value, this.closest, opts) return this.dht.requestAll(this.target, command, value, this._closestReplies, opts)
} }
async toArray () { async toArray () {
@ -78,7 +82,7 @@ module.exports = class Query extends Readable {
const closest = this.dht.table.closest(this.target, this.k - this._pending.length) const closest = this.dht.table.closest(this.target, this.k - this._pending.length)
for (const node of closest) { for (const node of closest) {
this._addPending(node.id, node.host, node.port) this._addPending(node.nodeId, node.host, node.port)
} }
} }
@ -88,22 +92,22 @@ module.exports = class Query extends Readable {
this.dht._resolveBootstrapNodes((bootstrapNodes) => { this.dht._resolveBootstrapNodes((bootstrapNodes) => {
for (const node of bootstrapNodes) { for (const node of bootstrapNodes) {
this._addPending(node.id, node.host, node.port) this._addPending(node.nodeId, node.host, node.port)
} }
cb(null) cb(null)
}) })
} }
_isCloser (id) { _isCloser (id) {
return this.closest.length < this.k || this._compare(id, this.closest[this.closest.length - 1].id) < 0 return this._closestReplies.length < this.k || this._compare(id, this._closestReplies[this._closestReplies.length - 1].nodeId) < 0
} }
_addPending (id, host, port) { _addPending (nodeId, host, port) {
if (id && !this._isCloser(id)) return if (nodeId && !this._isCloser(nodeId)) return
const addr = host + ':' + port const addr = host + ':' + port
if (this._seen.has(addr)) return if (this._seen.has(addr)) return
this._seen.add(addr) this._seen.add(addr)
this._pending.push({ id, host, port }) this._pending.push({ nodeId, host, port })
} }
_read (cb) { _read (cb) {
@ -118,7 +122,7 @@ module.exports = class Query extends Readable {
while (this.inflight < concurrency && this._pending.length > 0) { while (this.inflight < concurrency && this._pending.length > 0) {
const next = this._pending.pop() const next = this._pending.pop()
if (next && next.id && !this._isCloser(next.id)) continue if (next && next.nodeId && !this._isCloser(next.nodeId)) continue
this._visit(next) this._visit(next)
} }
@ -145,7 +149,7 @@ module.exports = class Query extends Readable {
return return
} }
if (!this.closest.length) { if (!this._closestReplies.length) {
this.destroy(new Error('Too few nodes responded')) this.destroy(new Error('Too few nodes responded'))
return return
} }
@ -154,7 +158,7 @@ module.exports = class Query extends Readable {
this._commiting = true this._commiting = true
const p = [] const p = []
for (const node of this.closest) p.push(this._commit(this.dht, this.target, node)) for (const node of this._closestReplies) p.push(this._commit(this.dht, this.target, node))
race(p, 1, p.length) race(p, 1, p.length)
.then(() => this.push(null), (err) => this.destroy(err)) .then(() => this.push(null), (err) => this.destroy(err))
@ -167,20 +171,13 @@ module.exports = class Query extends Readable {
this.inflight-- this.inflight--
if (m.nodeId !== null && this._isCloser(m.nodeId)) { if (m.nodeId !== null && this._isCloser(m.nodeId)) {
const node = { this._pushClosest(m)
id: m.nodeId,
token: m.token,
port: m.from.port,
host: m.from.host
}
this._pushClosest(node)
} }
if (m.closerNodes !== null) { if (m.closerNodes !== null) {
for (const node of m.closerNodes) { for (const node of m.closerNodes) {
if (node.id.equals(this.dht.table.id)) continue if (node.nodeId.equals(this.dht.table.id)) continue
this._addPending(node.id, node.host, node.port) this._addPending(node.nodeId, node.host, node.port)
} }
} }
@ -206,22 +203,22 @@ module.exports = class Query extends Readable {
} }
_pushClosest (node) { _pushClosest (node) {
this.closest.push(node) this._closestReplies.push(node)
for (let i = this.closest.length - 2; i >= 0; i--) { for (let i = this._closestReplies.length - 2; i >= 0; i--) {
const prev = this.closest[i] const prev = this._closestReplies[i]
const cmp = this._compare(prev.id, node.id) const cmp = this._compare(prev.nodeId, node.nodeId)
// if sorted, done! // if sorted, done!
if (cmp < 0) break if (cmp < 0) break
// if dup, splice it out (rare) // if dup, splice it out (rare)
if (cmp === 0) { if (cmp === 0) {
this.closest.splice(i + 1, 1) this._closestReplies.splice(i + 1, 1)
break break
} }
// swap and continue down // swap and continue down
this.closest[i + 1] = prev this._closestReplies[i + 1] = prev
this.closest[i] = node this._closestReplies[i] = node
} }
if (this.closest.length > this.k) this.closest.pop() if (this._closestReplies.length > this.k) this._closestReplies.pop()
} }
_compare (a, b) { _compare (a, b) {

10
lib/rpc.js

@ -238,13 +238,13 @@ module.exports = class RPC {
for (let i = 0; i < this.inflight.length; i++) { for (let i = 0; i < this.inflight.length; i++) {
const req = this.inflight[i] const req = this.inflight[i]
if (--req.timeout >= 0) continue if (req.tries > 0 && --req.timeout >= 0) continue
req.timeout = 2 req.timeout = 2
if (req.tries++ > this.maxRetries) { if (req.tries++ > this.maxRetries) {
if (i === this.inflight.length - 1) this.inflight.pop() if (i === this.inflight.length - 1) this.inflight.pop()
else this.inflight[i] = this.inflight.pop() else this.inflight[i] = this.inflight.pop()
req.reject(new Error('Request timed out')) req.reject(createTimeoutError())
continue continue
} }
@ -263,6 +263,12 @@ module.exports = class RPC {
} }
} }
function createTimeoutError () {
const err = new Error('Request timed out')
err.status = 0
return err
}
function createStatusError (status) { function createStatusError (status) {
const err = new Error('Request failed with status ' + status) const err = new Error('Request failed with status ' + status)
err.status = status err.status = status

12
test.js

@ -12,13 +12,13 @@ tape('make bigger swarm', async function (t) {
const targetNode = swarm[25] const targetNode = swarm[25]
let q = swarm[499].query(targetNode.id, 'find_node', null) let q = swarm[499].query(targetNode.nodeId, 'find_node', null)
let messages = 0 let messages = 0
let found = false let found = false
for await (const data of q) { for await (const data of q) {
messages++ messages++
if (data.nodeId.equals(targetNode.id)) { if (data.nodeId.equals(targetNode.nodeId)) {
found = true found = true
break break
} }
@ -26,13 +26,13 @@ tape('make bigger swarm', async function (t) {
t.ok(found, 'found target in ' + messages + ' message(s)') t.ok(found, 'found target in ' + messages + ' message(s)')
q = swarm[490].query(targetNode.id, 'find_node', null, { closest: q.closest }) q = swarm[490].query(targetNode.nodeId, 'find_node', null, { closest: q.closest })
messages = 0 messages = 0
found = false found = false
for await (const data of q) { for await (const data of q) {
messages++ messages++
if (data.nodeId.equals(targetNode.id)) { if (data.nodeId.equals(targetNode.nodeId)) {
found = true found = true
break break
} }
@ -67,8 +67,8 @@ tape('commit after query', async function (t) {
} }
const q = swarm[42].query(swarm[0].table.id, 'before', null, { const q = swarm[42].query(swarm[0].table.id, 'before', null, {
commit (node, target, to) { commit (node, target, m) {
return node.request(target, 'after', null, to) return node.request(target, 'after', null, { token: m.token, ...m.from })
} }
}) })

Loading…
Cancel
Save