Browse Source

use udx instead of built-in dgram for udp support (#40)

* Switch to `udx-native`

* Bump `udx-native`

* Bump `udx-native`

* Add `events` dependency

* Update to new UDX API

* Update Brittle

* Fix typo

* UDX -> UDP

* Make `_bindSockets()` sync

* Make `_bindSockets()` async

* Use `opts.udx`

* Track `udx-native@latest`

* Update to latest UDX API

* Switch back to `udx-native@latest`

* Use stable `udx-native`

* Emit `network-change` events

* Increase test timeout
session-estimator
Kasper Isager Dalsgarð 2 years ago
committed by GitHub
parent
commit
925d886382
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      README.md
  2. 20
      index.js
  3. 66
      lib/io.js
  4. 7
      package.json
  5. 25
      test.js

22
README.md

@ -116,12 +116,12 @@ Options include:
{
// A list of bootstrap nodes
bootstrap: [ 'bootstrap-node.com:24242', ... ],
// Optionally pass in your own UDP socket to use.
socket: udpSocket,
// Optionally pass in array of { host, port } to add to the routing table if you know any peers
nodes: [{ host, port }, ...],
// Optionally pass a port you prefer to bind to instead of a random one
bind: 0,
port: 0,
// Optionally pass a UDX instance on which sockets will be created.
udx,
// dht-rpc will automatically detect if you are firewalled. If you know that you are not set this to false
firewalled: true
}
@ -133,7 +133,7 @@ For the vast majority of use-cases you should always use adaptive mode to ensure
Your DHT routing id is `hash(publicIp + publicPort)` and will be autoconfigured internally.
#### `const node = DHT.boostrapper(bind, [options])`
#### `const node = DHT.bootrapper(port, [options])`
Sugar for the options needed to run a bootstrap node, ie
@ -144,7 +144,7 @@ Sugar for the options needed to run a bootstrap node, ie
}
```
Additionally since you'll want a known port for a bootstrap node it adds the bind option as a primary argument.
Additionally since you'll want a known port for a bootstrap node it adds the `port` option as a primary argument.
#### `await node.ready()`
@ -165,7 +165,7 @@ Emitted when the routing table is fully bootstrapped. Emitted as a conveinience.
#### `node.on('listening')`
Emitted when the underlying UDP socket is listening. Emitted as a conveinience.
Emitted when the underlying UDX socket is listening. Emitted as a conveinience.
#### `node.on('persistent')`
@ -179,6 +179,10 @@ you are on an open NAT.
Emitted when the node has detected that the computer has gone to sleep. If this happens,
it will switch from persistent mode to ephemeral again.
#### `node.on('network-change', interfaces)`
Emitted when the network interfaces of the computer change.
#### `node.refresh()`
Refresh the routing table by looking up a random node in the background.
@ -201,12 +205,12 @@ Boolean indicated if your node is behind a firewall.
This is auto detected by having other node's trying to do a PING to you
without you contacting them first.
#### `const udpAddr = node.address()`
#### `const addr = node.address()`
Get the local address of the UDP socket bound.
Note that if you are in ephemeral mode, this will return a different
port than the one you provided in the constructor (under bind), as ephemeral
port than the one you provided in the constructor (under `port`), as ephemeral
mode always uses a random port.
#### `node.on('request', req)`
@ -242,7 +246,7 @@ Options include:
```js
{
retry: true, // whether the request should retry on timeout
socket: udpSocket // request on this specific socket
socket: udxSocket // request on this specific socket
}
```

20
index.js

@ -2,6 +2,7 @@ const dns = require('dns')
const { EventEmitter } = require('events')
const Table = require('kademlia-routing-table')
const TOS = require('time-ordered-set')
const UDX = require('udx-native')
const sodium = require('sodium-universal')
const c = require('compact-encoding')
const NatSampler = require('nat-sampler')
@ -28,7 +29,8 @@ class DHT extends EventEmitter {
this.bootstrapNodes = opts.bootstrap === false ? [] : (opts.bootstrap || []).map(parseNode)
this.table = new Table(opts.id || randomBytes(32))
this.nodes = new TOS()
this.io = new IO(this.table, {
this.udx = opts.udx || new UDX()
this.io = new IO(this.table, this.udx, {
...opts,
onrequest: this._onrequest.bind(this),
onresponse: this._onresponse.bind(this),
@ -43,7 +45,7 @@ class DHT extends EventEmitter {
this.destroyed = false
this._nat = new NatSampler()
this._bind = opts.bind || 0
this._port = opts.port || 0
this._quickFirewall = opts.quickFirewall !== false
this._forcePersistent = opts.ephemeral === false
this._repinging = 0
@ -60,13 +62,15 @@ class DHT extends EventEmitter {
this.table.on('row', this._onrow)
this.io.networkInterfaces.on('change', (interfaces) => this._onnetworkchange(interfaces))
if (opts.nodes) {
for (const node of opts.nodes) this.addNode(node)
}
}
static bootstrapper (bind, opts) {
return new this({ bind, firewalled: false, bootstrap: [], ...opts })
static bootstrapper (port, opts) {
return new this({ port, firewalled: false, bootstrap: [], ...opts })
}
get id () {
@ -329,6 +333,10 @@ class DHT extends EventEmitter {
this._repingAndSwap(newNode, oldest)
}
_onnetworkchange (interfaces) {
this.emit('network-change', interfaces)
}
_repingAndSwap (newNode, oldNode) {
const self = this
const lastSeen = oldNode.seen
@ -619,8 +627,8 @@ class DHT extends EventEmitter {
return false
function onmessage (_, rinfo) {
hosts.push(rinfo.address)
function onmessage (_, { host }) {
hosts.push(host)
}
}

66
lib/io.js

@ -1,7 +1,6 @@
const FIFO = require('fast-fifo')
const sodium = require('sodium-universal')
const c = require('compact-encoding')
const bind = require('bind-easy')
const b4a = require('b4a')
const peer = require('./peer')
const errors = require('./errors')
@ -13,14 +12,16 @@ const TMP = b4a.alloc(32)
const EMPTY_ARRAY = []
module.exports = class IO {
constructor (table, { maxWindow = 80, bind = 0, firewalled = true, onrequest, onresponse = noop, ontimeout = noop } = {}) {
constructor (table, udx, { maxWindow = 80, port = 0, firewalled = true, onrequest, onresponse = noop, ontimeout = noop } = {}) {
this.table = table
this.udx = udx
this.inflight = []
this.clientSocket = null
this.serverSocket = null
this.firewalled = firewalled !== false
this.ephemeral = true
this.congestion = new CongestionWindow(maxWindow)
this.networkInterfaces = udx.watchNetworkInterfaces()
this.onrequest = onrequest
this.onresponse = onresponse
@ -33,13 +34,13 @@ module.exports = class IO {
this._drainInterval = null
this._destroying = null
this._binding = null
this._bind = bind
this._port = port
}
onmessage (socket, buffer, rinfo) {
if (buffer.byteLength < 2 || !(rinfo.port > 0 && rinfo.port < 65536)) return
onmessage (socket, buffer, { host, port }) {
if (buffer.byteLength < 2 || !(port > 0 && port < 65536)) return
const from = { id: null, host: rinfo.address, port: rinfo.port }
const from = { id: null, host, port }
const state = { start: 1, end: buffer.byteLength, buffer }
const expectedSocket = this.firewalled ? this.clientSocket : this.serverSocket
const external = socket !== expectedSocket
@ -114,16 +115,11 @@ module.exports = class IO {
req.onerror(errors.createDestroyedError(), req)
}
this._destroying = new Promise((resolve) => {
let missing = 2
this.serverSocket.close(done)
this.clientSocket.close(done)
function done () {
if (--missing === 0) resolve()
}
})
this._destroying = Promise.allSettled([
this.serverSocket.close(),
this.clientSocket.close(),
this.networkInterfaces.destroy()
])
return this._destroying
}
@ -135,19 +131,32 @@ module.exports = class IO {
}
async _bindSockets () {
const serverSocket = typeof this._bind === 'function' ? await this._bind() : await bind.udp(this._bind)
const serverSocket = this.udx.createSocket()
try {
serverSocket.bind(this._port)
} catch {
try {
serverSocket.bind()
} catch (err) {
await serverSocket.close()
throw err
}
}
const clientSocket = this.udx.createSocket()
try {
// TODO: we should reroll the socket is it's close to our preferred range of ports
// to avoid it being accidentally opened
// We'll prop need additional APIs for that
this.clientSocket = await bind.udp()
this.serverSocket = serverSocket
clientSocket.bind()
} catch (err) {
await new Promise((resolve) => serverSocket.close(resolve))
await serverSocket.close()
await clientSocket.close()
throw err
}
this.clientSocket = clientSocket
this.serverSocket = serverSocket
this.serverSocket.on('message', this.onmessage.bind(this, this.serverSocket))
this.clientSocket.on('message', this.onmessage.bind(this, this.clientSocket))
@ -242,8 +251,7 @@ class Request {
reply (value, opts = {}) {
const socket = opts.socket || this.socket
const to = opts.to || this.from
const onflush = opts.onflush || null
this._sendReply(0, value || null, opts.token !== false, opts.closerNodes !== false, to, socket, onflush)
this._sendReply(0, value || null, opts.token !== false, opts.closerNodes !== false, to, socket)
}
error (code, opts = {}) {
@ -255,7 +263,7 @@ class Request {
relay (value, to, opts) {
const socket = (opts && opts.socket) || this.socket
const buffer = this._encodeRequest(null, value, to, socket)
socket.send(buffer, 0, buffer.byteLength, to.port, to.host)
socket.trySend(buffer, to.port, to.host)
}
send (force = false) {
@ -280,7 +288,7 @@ class Request {
if (this.destroyed) return
this.sent++
this._io.congestion.send()
this.socket.send(this._buffer, 0, this._buffer.byteLength, this.to.port, this.to.host)
this.socket.trySend(this._buffer, this.to.port, this.to.host)
if (this._timeout) clearTimeout(this._timeout)
this._timeout = setTimeout(oncycle, 1000, this)
}
@ -298,7 +306,7 @@ class Request {
this.onerror(err || errors.createDestroyedError(), this)
}
_sendReply (error, value, token, hasCloserNodes, from, socket, onflush) {
_sendReply (error, value, token, hasCloserNodes, from, socket) {
if (socket === null || this.destroyed) return
const id = this._io.ephemeral === false && socket === this._io.serverSocket
@ -324,7 +332,7 @@ class Request {
if (error > 0) c.uint.encode(state, error)
if (value) c.buffer.encode(state, value)
socket.send(state.buffer, 0, state.buffer.byteLength, from.port, from.host, onflush)
socket.trySend(state.buffer, from.port, from.host)
}
_encodeRequest (token, value, to, socket) {

7
package.json

@ -5,18 +5,19 @@
"main": "index.js",
"dependencies": {
"b4a": "^1.3.1",
"bind-easy": "^1.0.0",
"compact-encoding": "^2.1.0",
"compact-encoding-net": "^1.0.1",
"events": "^3.3.0",
"fast-fifo": "^1.0.0",
"kademlia-routing-table": "^1.0.0",
"nat-sampler": "^1.0.1",
"sodium-universal": "^3.0.4",
"streamx": "^2.10.3",
"time-ordered-set": "^1.0.2"
"time-ordered-set": "^1.0.2",
"udx-native": "^1.1.0"
},
"devDependencies": {
"brittle": "^1.4.3",
"brittle": "^2.3.1",
"standard": "^16.0.3"
},
"scripts": {

25
test.js

@ -2,14 +2,12 @@ const test = require('brittle')
const dgram = require('dgram')
const DHT = require('./')
test.configure({ serial: true })
test('make tiny swarm', async function (t) {
await makeSwarm(2, t)
t.pass('could make swarm')
})
test('make bigger swarm', async function (t) {
test('make bigger swarm', { timeout: 60000 }, async function (t) {
const swarm = await makeSwarm(500, t)
const targetNode = swarm[25]
@ -165,23 +163,6 @@ test('request with/without retries', async function (t) {
t.is(tries, 4)
})
test('reply onflush', async function (t) {
const [, a, b] = await makeSwarm(3, t)
let flushed = false
b.on('request', function (req) {
req.reply(null, {
onflush () {
flushed = true
}
})
})
await a.request({ command: 42 }, { host: '127.0.0.1', port: b.address().port })
t.ok(flushed)
})
test('shorthand commit', async function (t) {
const swarm = await makeSwarm(40, t)
@ -290,12 +271,12 @@ test('addNode / nodes option', async function (t) {
test('set bind', async function (t) {
const port = await freePort()
const a = new DHT({ bind: port, firewalled: false })
const a = new DHT({ port, firewalled: false })
await a.ready()
t.alike(a.address().port, port, 'bound to explicit port')
const b = new DHT({ bind: port })
const b = new DHT({ port })
await b.ready()
t.not(b.address().port, port, 'bound to different port as explicit one is taken')

Loading…
Cancel
Save