Browse Source

add setEphemeral method (#9)

* add turnNonEphemeral method

* rename to joinDht

* joinDht functionality and tests

* update ephemeral flag only after successful join dht

* joinDht -> setEphemeral

* setEphem(true) test, currently failing

* setEphemeral(true) now working

* rm left over debug artifacts

* update docs to api alteration

* trigger newly ephem node gc from the ght via requests instead of ping, as we need ping for holepunching to bootstrap nodes

* charCodeAt instead of string slice
v4
David Mark Clements 5 years ago
committed by GitHub
parent
commit
74c4a740e1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      README.md
  2. 28
      index.js
  3. 22
      lib/io.js
  4. 6
      lib/messages.js
  5. 6
      lib/query-stream.js
  6. 74
      test.js

4
README.md

@ -222,6 +222,10 @@ Peer should look like this:
Explicitly bind the dht node to a certain port/address.
#### `node.setEpehemeral(boolean)`
Dynamically convert the node into ephemeral (leave the DHT) or non-ephemeral (join the DHT).
#### `node.on('listening')`
Emitted when the node starts listening on a udp port.

28
index.js

@ -101,7 +101,7 @@ class DHT extends EventEmitter {
_onping (message, peer) {
if (message.value && !this.id.equals(message.value)) return
this._io.response(message, peers.encode([ peer ]), null, peer)
this._io.response(message, peers.encode([peer]), null, peer)
}
_onholepunch (message, peer) {
@ -112,7 +112,7 @@ class DHT extends EventEmitter {
const to = decodePeer(value.to)
if (!to || samePeer(to, peer)) return
message.id = this._io.id
message.value = Holepunch.encode({ from: peers.encode([ peer ]) })
message.value = Holepunch.encode({ from: peers.encode([peer]) })
this.emit('holepunch', peer, to)
this._io.send(Message.encode(message), to)
return
@ -225,7 +225,6 @@ class DHT extends EventEmitter {
_onnodeping (oldContacts, newContact) {
// if bootstrapping, we've recently pinged all nodes
if (!this.bootstrapped) return
const reping = []
for (var i = 0; i < oldContacts.length; i++) {
@ -246,7 +245,9 @@ class DHT extends EventEmitter {
_check (node) {
const self = this
this.ping(node, function (err) {
if (err) self._removeNode(node)
if (err) {
self._removeNode(node)
}
})
}
@ -271,7 +272,6 @@ class DHT extends EventEmitter {
_pingSome () {
var cnt = this.inflightQueries > 2 ? 1 : 3
var oldest = this.nodes.oldest
// tiny dht, ping the bootstrap again
if (!oldest) return this.bootstrap()
@ -338,6 +338,24 @@ class DHT extends EventEmitter {
qs._concurrency = self.inflightQueries === 1 ? self.concurrency : backgroundCon
}
}
setEphemeral (ephemeral = false, cb) {
if (ephemeral === true) {
this._io._updateId(null)
this.ephemeral = true
if (cb) process.nextTick(cb)
return
}
this._io._updateId(this.id)
this.bootstrap((err) => {
if (err) {
if (cb) cb(err)
return
}
this.ephemeral = false
if (cb) cb()
})
}
}
exports.QUERY = DHT.QUERY = IO.QUERY

22
lib/io.js

@ -24,7 +24,7 @@ class IO {
this._rid = (Math.random() * 65536) | 0
this._requests = new Array(65536)
this._pending = []
this._secrets = [ randomBytes(32), randomBytes(32) ]
this._secrets = [randomBytes(32), randomBytes(32)]
this._ticking = false
this._tickInterval = setInterval(this._ontick.bind(this), 750)
this._rotateInterval = setInterval(this._onrotate.bind(this), 300000)
@ -78,7 +78,7 @@ class IO {
rid,
command: '_holepunch',
value: Holepunch.encode({
to: peers.encode([ req.peer ])
to: peers.encode([req.peer])
})
}
@ -219,6 +219,19 @@ class IO {
}
response (request, value, closerNodes, peer) {
if (this._ctx.ephemeral) {
const privateCommand = request.command.charCodeAt(0) === 95 // '_'
if (privateCommand === false) {
// if a node was non-ephemeral and then was
// set to ephemeral, other DHT nodes will
// still have a reference to the node. Therefore
// if the node is ephemeral and the request
// is not a DHT internals command ignore the
// request to make other nodes remove it from
// their list.
return
}
}
const message = {
type: TYPE.RESPONSE,
rid: request.rid,
@ -227,7 +240,6 @@ class IO {
roundtripToken: this._token(peer, 0),
value
}
this.send(Message.encode(message), peer)
}
@ -282,6 +294,10 @@ class IO {
value
}, peer, callback)
}
_updateId (id) {
this.id = id
}
}
IO.QUERY = QUERY

6
lib/messages.js

@ -11,9 +11,9 @@ var varint = encodings.varint
var skip = encodings.skip
exports.TYPE = {
"QUERY": 1,
"UPDATE": 2,
"RESPONSE": 3
QUERY: 1,
UPDATE: 2,
RESPONSE: 3
}
var Holepunch = exports.Holepunch = {

6
lib/query-stream.js

@ -48,7 +48,6 @@ class QueryStream extends Readable {
_onresponse (err, message, peer, request, to, type) {
this.inflight--
if (err && to && to.id) {
// Request, including retries, failed completely
// Remove the "to" node.
@ -96,6 +95,11 @@ class QueryStream extends Readable {
return
}
if (message.id === null) {
this._readMaybe()
return
}
const value = this._outputEncoding
? this._decodeOutput(message.value)
: message.value

74
test.js

@ -212,6 +212,80 @@ tape('timeouts', function (t) {
})
})
tape('setEphemeral(false)', function (t) {
bootstrap(function (port, node) {
const a = dht({ bootstrap: port, ephemeral: true })
const b = dht({ bootstrap: port })
a.command('hello', {
query (data, callback) {
callback(null, Buffer.from('world'))
}
})
a.ready(function () {
b.ready(function () {
const key = blake2b(Buffer.from('hello'))
b.query('hello', key, (err, result) => {
t.error(err)
t.is(result.length, 0)
a.setEphemeral(false, (err) => {
t.error(err)
b.query('hello', key, (err, result) => {
t.error(err)
t.is(result.length, 1)
t.is(Buffer.compare(result[0].node.id, a.id), 0)
a.destroy()
b.destroy()
node.destroy()
t.end()
})
})
})
})
})
})
})
tape('setEphemeral(true)', function (t) {
bootstrap(function (port, node) {
const a = dht({ bootstrap: port, ephemeral: false })
const b = dht({ bootstrap: port })
a.command('hello', {
query (data, callback) {
callback(null, Buffer.from('world'))
}
})
a.ready(function () {
b.ready(function () {
const key = blake2b(Buffer.from('hello'))
b.query('hello', key, (err, result) => {
t.error(err)
t.is(result.length, 1)
t.is(Buffer.compare(result[0].node.id, a.id), 0)
a.setEphemeral(true, (err) => {
t.error(err)
b.query('hello', key, (err, result) => {
t.ok(err)
})
b.once('remove-node', () => {
b.query('hello', key, (err, result) => {
t.error(err)
t.is(result.length, 0)
a.destroy()
b.destroy()
node.destroy()
t.end()
})
})
})
})
})
})
})
})
function bootstrap (done) {
const node = dht({
ephemeral: true

Loading…
Cancel
Save