From 86a4ff305eff68489e8898bd9cd771f0bb9dc73f Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Thu, 2 Sep 2021 13:27:02 +0200 Subject: [PATCH] make relaying more explicit --- index.js | 10 +++++----- lib/io.js | 29 +++++++++++++++++------------ test.js | 29 +++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 17 deletions(-) diff --git a/index.js b/index.js index bf50686..f8fb421 100644 --- a/index.js +++ b/index.js @@ -307,7 +307,7 @@ class DHT extends EventEmitter { // standard keep alive call if (req.command === 'ping') { - req.sendReply(0, null, false, false) + req.sendReply(0, null, false, false, req.from) return } @@ -317,14 +317,14 @@ class DHT extends EventEmitter { const port = c.uint16.decode({ start: 0, end: 2, buffer: req.value }) if (port === 0) return req.from.port = port - req.sendReply(0, null, false, false) + req.sendReply(0, null, false, false, req.from) return } // empty dht reply back if (req.command === 'find_node') { if (!req.target) return - req.sendReply(0, null, false, true) + req.sendReply(0, null, false, true, req.from) return } @@ -338,13 +338,13 @@ class DHT extends EventEmitter { this._check(node) } } - req.sendReply(0, null, false, false) + req.sendReply(0, null, false, false, req.from) return } // ask the user to handle it or reply back with a bad command if (this.onrequest(req) === false) { - req.sendReply(UNKNOWN_COMMAND, null, false, true) + req.sendReply(UNKNOWN_COMMAND, null, false, true, req.from) } } diff --git a/lib/io.js b/lib/io.js index 5aba4b9..d263c43 100644 --- a/lib/io.js +++ b/lib/io.js @@ -235,18 +235,23 @@ class Request { } reply (value, opts = {}) { - this.sendReply(0, value || null, opts.token !== false, this.target !== null && opts.closerNodes !== false) + this.sendReply(0, value || null, opts.token !== false, this.target !== null && opts.closerNodes !== false, opts.to || this.from) } error (code, opts = {}) { - this.sendReply(code, null, opts.token === true, this.target !== null && opts.closerNodes !== false) + this.sendReply(code, null, opts.token === true, this.target !== null && opts.closerNodes !== false, opts.to || this.from) + } + + relay (value, to) { + const buffer = this._encodeRequest(null, value) + this.socket.send(buffer, 0, buffer.byteLength, to.port, to.host) } send (force = false) { if (this.destroyed) return if (this.socket === null) return - if (this._buffer === null) this._buffer = this._encodeRequest() + if (this._buffer === null) this._buffer = this._encodeRequest(this.token, this.value) if (!force && this._io.congestion.isFull()) { this._io._pending.push(this) @@ -278,7 +283,7 @@ class Request { this.onerror(err || DESTROY, this) } - sendReply (error, value, token, hasCloserNodes) { + sendReply (error, value, token, hasCloserNodes, from) { if (this.socket === null || this.destroyed) return const id = this._io.ephemeral === false && this.socket === this._io.serverSocket @@ -296,7 +301,7 @@ class Request { state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (closerNodes.length > 0 ? 4 : 0) | (error > 0 ? 8 : 0) | (value ? 16 : 0) c.uint16.encode(state, this.tid) - peer.ipv4.encode(state, this.from) + peer.ipv4.encode(state, from) if (id) c.fixed32.encode(state, this._io.table.id) if (token) c.fixed32.encode(state, this._io.token(this.to, 1)) @@ -304,35 +309,35 @@ class Request { if (error > 0) c.uint.encode(state, error) if (value) c.buffer.encode(state, value) - this.socket.send(state.buffer, 0, state.buffer.byteLength, this.from.port, this.from.host) + this.socket.send(state.buffer, 0, state.buffer.byteLength, from.port, from.host) } - _encodeRequest () { + _encodeRequest (token, value) { const id = this._io.ephemeral === false && this.socket === this._io.serverSocket const state = { start: 0, end: 1 + 1 + 6 + 2, buffer: null } // (type | version) + flags + to + tid if (id) state.end += 32 - if (this.token) state.end += 32 + if (token) state.end += 32 c.string.preencode(state, this.command) if (this.target) state.end += 32 - if (this.value) c.buffer.preencode(state, this.value) + if (value) c.buffer.preencode(state, value) state.buffer = Buffer.allocUnsafe(state.end) state.buffer[state.start++] = REQUEST_ID - state.buffer[state.start++] = (id ? 1 : 0) | (this.token ? 2 : 0) | (this.target ? 4 : 0) | (this.value ? 8 : 0) + state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (this.target ? 4 : 0) | (value ? 8 : 0) c.uint16.encode(state, this.tid) peer.ipv4.encode(state, this.to) if (id) c.fixed32.encode(state, this._io.table.id) - if (this.token) c.fixed32.encode(state, this.token) + if (token) c.fixed32.encode(state, token) c.string.encode(state, this.command) if (this.target) c.fixed32.encode(state, this.target) - if (this.value) c.buffer.encode(state, this.value) + if (value) c.buffer.encode(state, value) return state.buffer } diff --git a/test.js b/test.js index 2227a28..c1a18f7 100644 --- a/test.js +++ b/test.js @@ -258,6 +258,35 @@ tape('set bind', async function (t) { b.destroy() }) +tape('relay', async function (t) { + const [bootstrap, a, b, c] = await makeSwarm(4) + + b.on('request', function (req) { + t.same(req.command, 'route', 'b got request') + t.same(req.from.port, a.address().port, 'from a') + const value = Buffer.concat([req.value, Buffer.from('b')]) + req.relay(value, { host: '127.0.0.1', port: c.address().port }) + }) + + c.on('request', function (req) { + t.same(req.command, 'route', 'c got request') + t.same(req.from.port, b.address().port, 'from b') + const value = Buffer.concat([req.value, Buffer.from('c')]) + req.reply(value, { to: { host: '127.0.0.1', port: a.address().port } }) + }) + + const res = await a.request({ command: 'route', value: Buffer.from('a') }, { host: '127.0.0.1', port: b.address().port }) + + t.same(res.value, Buffer.from('abc')) + t.same(res.from.port, c.address().port) + t.same(res.to.port, a.address().port) + + bootstrap.destroy() + a.destroy() + b.destroy() + c.destroy() +}) + function destroy (list) { for (const node of list) node.destroy() }