Browse Source

use time-ordered-set

v4
Mathias Buus 6 years ago
parent
commit
fca4fe6242
  1. 62
      index.js
  2. 1
      package.json
  3. 4
      query-stream.js

62
index.js

@ -5,6 +5,7 @@ var events = require('events')
var peers = require('ipv4-peers')
var collect = require('stream-collector')
var sodium = require('sodium-universal')
var tos = require('time-ordered-set')
var nodes = peers.idLength(32)
var messages = require('./messages')
var queryStream = require('./query-stream')
@ -23,8 +24,8 @@ function DHT (opts) {
this.concurrency = opts.concurrency || 16
this.id = opts.id || randomBytes(32)
this.ephemeral = !!opts.ephemeral
this.nodes = new KBucket({localNodeId: this.id, arbiter: arbiter})
this.nodes.on('ping', onnodeping)
this.bucket = new KBucket({localNodeId: this.id, arbiter: arbiter})
this.bucket.on('ping', onnodeping)
this.inflightQueries = 0
this.socket = udp({
@ -37,6 +38,8 @@ function DHT (opts) {
this.socket.on('response', onresponse)
this.socket.on('close', onclose)
this.nodes = tos()
this._bootstrap = [].concat(opts.bootstrap || []).map(parseAddr)
this._queryId = this.ephemeral ? null : this.id
this._bootstrapped = false
@ -45,8 +48,6 @@ function DHT (opts) {
this._secrets = [randomBytes(32), randomBytes(32)]
this._secretsInterval = setInterval(rotateSecrets, 5 * 60 * 1000)
this._tickInterval = setInterval(tick, 5 * 1000)
this._top = null
this._bottom = null
if (opts.nodes) {
for (var i = 0; i < opts.nodes.length; i++) {
@ -109,7 +110,7 @@ DHT.prototype.update = function (query, opts, cb) {
DHT.prototype._pingSome = function () {
var cnt = this.inflightQueries > 2 ? 1 : 3
var oldest = this._bottom
var oldest = this.nodes.oldest
while (cnt--) {
if (!oldest || this._tick - oldest.tick < 3) continue
@ -132,7 +133,7 @@ DHT.prototype.ping = function (peer, cb) {
}
DHT.prototype.toArray = function () {
return this.nodes.toArray()
return this.bucket.toArray()
}
DHT.prototype.destroy = function () {
@ -285,7 +286,7 @@ DHT.prototype._onquery = function (request, peer) {
var res = {
id: self._queryId,
value: value || null,
nodes: nodes.encode(self.nodes.closest(request.target, 20)),
nodes: nodes.encode(self.bucket.closest(request.target, 20)),
roundtripToken: self._token(peer, 0)
}
@ -317,7 +318,7 @@ DHT.prototype._onfindnode = function (request, peer) {
var res = {
id: this._queryId,
nodes: nodes.encode(this.nodes.closest(request.target, 20)),
nodes: nodes.encode(this.bucket.closest(request.target, 20)),
roundtripToken: this._token(peer, 0)
}
@ -333,7 +334,7 @@ DHT.prototype._onnodeping = function (oldContacts, newContact) {
var old = oldContacts[i]
if (this._tick - old.tick < 3) { // less than 10s since we talked to this peer ...
this.nodes.add(oldContacts[i])
this.bucket.add(oldContacts[i])
continue
}
@ -365,7 +366,7 @@ DHT.prototype._reping = function (oldContacts, newContact) {
if (!err) return ping()
self._removeNode(next)
self.nodes.add(newContact)
self.bucket.add(newContact)
}
}
@ -376,7 +377,7 @@ DHT.prototype._token = function (peer, i) {
DHT.prototype._addNode = function (id, peer, token) {
if (id.equals(this.id)) return
var node = this.nodes.get(id)
var node = this.bucket.get(id)
var fresh = !node
if (!node) node = {}
@ -387,16 +388,16 @@ DHT.prototype._addNode = function (id, peer, token) {
node.roundtripToken = token
node.tick = this._tick
if (!fresh) remove(this, node)
add(this, node)
if (!fresh) this.nodes.remove(node)
this.nodes.add(node)
this.bucket.add(node)
if (fresh) this.emit('add-node', node)
}
DHT.prototype._removeNode = function (node) {
remove(this, node)
this.nodes.remove(node.id)
this.nodes.remove(node)
this.bucket.remove(node.id)
this.emit('remove-node', node)
}
@ -431,35 +432,6 @@ function arbiter (incumbant, candidate) {
return candidate
}
function remove (self, node) {
if (self._bottom !== node && self._top !== node) {
node.prev.next = node.next
node.next.prev = node.prev
node.next = node.prev = null
} else {
if (self._bottom === node) {
self._bottom = node.next
if (self._bottom) self._bottom.prev = null
}
if (self._top === node) {
self._top = node.prev
if (self._top) self._top.next = null
}
}
}
function add (self, node) {
if (!self._top && !self._bottom) {
self._top = self._bottom = node
node.prev = node.next = null
} else {
self._top.next = node
node.prev = self._top
node.next = null
self._top = node
}
}
function randomBytes (n) {
var buf = Buffer.allocUnsafe(n)
sodium.randombytes_buf(buf)

1
package.json

@ -12,6 +12,7 @@
"readable-stream": "^2.1.5",
"sodium-universal": "^2.0.0",
"stream-collector": "^1.0.1",
"time-ordered-set": "^1.0.1",
"udp-request": "^1.3.0",
"xor-distance": "^1.0.0"
},

4
query-stream.js

@ -81,7 +81,7 @@ QueryStream.prototype._finalize = function () {
QueryStream.prototype._bootstrap = function () {
this._bootstrapped = true
var bootstrap = this._dht.nodes.closest(this.target, this._k)
var bootstrap = this._dht.bucket.closest(this.target, this._k)
var i = 0
for (i = 0; i < bootstrap.length; i++) {
@ -145,7 +145,7 @@ QueryStream.prototype._callback = function (err, res, peer) {
if (err) {
if (res && res.id) {
var node = this._dht.nodes.get(res.id)
var node = this._dht.bucket.get(res.id)
if (node) this._dht._removeNode(node)
}
this.errors++

Loading…
Cancel
Save