Browse Source

Replace `Buffer` with `b4a` (#37)

session-estimator
Kasper Isager Dalsgarð 3 years ago
committed by GitHub
parent
commit
552e7a4464
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      index.js
  2. 17
      lib/io.js
  3. 3
      lib/peer.js
  4. 5
      lib/query.js
  5. 1
      package.json

15
index.js

@ -5,13 +5,14 @@ const TOS = require('time-ordered-set')
const sodium = require('sodium-universal')
const c = require('compact-encoding')
const NatSampler = require('nat-sampler')
const b4a = require('b4a')
const IO = require('./lib/io')
const Query = require('./lib/query')
const peer = require('./lib/peer')
const { UNKNOWN_COMMAND, INVALID_TOKEN } = require('./lib/errors')
const { PING, PING_NAT, FIND_NODE, DOWN_HINT } = require('./lib/commands')
const TMP = Buffer.allocUnsafe(32)
const TMP = b4a.allocUnsafe(32)
const TICK_INTERVAL = 5000
const SLEEPING_INTERVAL = 3 * TICK_INTERVAL
const STABLE_TICKS = 240 // if nothing major bad happens in ~20mins we can consider this node stable (if nat is friendly)
@ -194,7 +195,7 @@ class DHT extends EventEmitter {
if (!first) return
first = false
const value = Buffer.allocUnsafe(2)
const value = b4a.allocUnsafe(2)
c.uint16.encode({ start: 0, end: 2, buffer: value }, self.io.serverSocket.address().port)
self._request(data.from, true, PING_NAT, null, value, () => { testNat = true }, noop)
@ -269,7 +270,7 @@ class DHT extends EventEmitter {
}
_addNode (node) {
if (this.nodes.has(node) || node.id.equals(this.table.id)) return
if (this.nodes.has(node) || b4a.equals(node.id, this.table.id)) return
node.added = node.pinged = node.seen = this._tick
@ -529,13 +530,13 @@ class DHT extends EventEmitter {
// as possible, vs blindly copying them over...
// all good! copy over the old routing table to the new one
if (!this.table.id.equals(id)) {
if (!b4a.equals(this.table.id, id)) {
const nodes = this.table.toArray()
this.table = this.io.table = new Table(id)
for (const node of nodes) {
if (node.id.equals(id)) continue
if (b4a.equals(node.id, id)) continue
if (!this.table.add(node)) this.nodes.remove(node)
}
@ -586,7 +587,7 @@ class DHT extends EventEmitter {
if (nodes.length === 0) return true
const hosts = []
const value = Buffer.allocUnsafe(2)
const value = b4a.allocUnsafe(2)
c.uint16.encode({ start: 0, end: 2, buffer: value }, this.io.serverSocket.address().port)
@ -659,7 +660,7 @@ function parseNode (s) {
}
function randomBytes (n) {
const b = Buffer.alloc(n)
const b = b4a.alloc(n)
sodium.randombytes_buf(b)
return b
}

17
lib/io.js

@ -2,13 +2,14 @@ const FIFO = require('fast-fifo')
const sodium = require('sodium-universal')
const c = require('compact-encoding')
const bind = require('bind-easy')
const b4a = require('b4a')
const peer = require('./peer')
const { INVALID_TOKEN, TIMEOUT, DESTROY } = require('./errors')
const VERSION = 0b11
const RESPONSE_ID = (0b0001 << 4) | VERSION
const REQUEST_ID = (0b0000 << 4) | VERSION
const TMP = Buffer.alloc(32)
const TMP = b4a.alloc(32)
const EMPTY_ARRAY = []
module.exports = class IO {
@ -46,7 +47,7 @@ module.exports = class IO {
if (buffer[0] === REQUEST_ID) {
const req = Request.decode(this, socket, from, state)
if (req === null) return
if (req.token !== null && !req.token.equals(this.token(req.from, 1)) && !req.token.equals(this.token(req.from, 0))) {
if (req.token !== null && !b4a.equals(req.token, this.token(req.from, 1)) && !b4a.equals(req.token, this.token(req.from, 0))) {
req.error(INVALID_TOKEN, { token: true })
return
}
@ -82,14 +83,14 @@ module.exports = class IO {
token (addr, i) {
if (this._secrets === null) {
const buf = Buffer.alloc(64)
const buf = b4a.alloc(64)
this._secrets = [buf.subarray(0, 32), buf.subarray(32, 64)]
sodium.randombytes_buf(this._secrets[0])
sodium.randombytes_buf(this._secrets[1])
}
const token = Buffer.allocUnsafe(32)
sodium.crypto_generichash(token, Buffer.from(addr.host), this._secrets[i])
const token = b4a.allocUnsafe(32)
sodium.crypto_generichash(token, b4a.from(addr.host), this._secrets[i])
return token
}
@ -309,7 +310,7 @@ class Request {
if (error > 0) c.uint.preencode(state, error)
if (value) c.buffer.preencode(state, value)
state.buffer = Buffer.allocUnsafe(state.end)
state.buffer = b4a.allocUnsafe(state.end)
state.buffer[state.start++] = RESPONSE_ID
state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (closerNodes.length > 0 ? 4 : 0) | (error > 0 ? 8 : 0) | (value ? 16 : 0)
@ -337,7 +338,7 @@ class Request {
if (this.target) state.end += 32
if (value) c.buffer.preencode(state, value)
state.buffer = Buffer.allocUnsafe(state.end)
state.buffer = b4a.allocUnsafe(state.end)
state.buffer[state.start++] = REQUEST_ID
state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (this.internal ? 4 : 0) | (this.target ? 8 : 0) | (value ? 16 : 0)
@ -420,5 +421,5 @@ function decodeReply (from, state) {
}
function validateId (id, from) {
return peer.id(from.host, from.port, TMP).equals(id) ? id : null
return b4a.equals(peer.id(from.host, from.port, TMP), id) ? id : null
}

3
lib/peer.js

@ -1,6 +1,7 @@
const sodium = require('sodium-universal')
const c = require('compact-encoding')
const net = require('compact-encoding-net')
const b4a = require('b4a')
const ipv4 = {
...net.ipv4Address,
@ -16,7 +17,7 @@ const ipv4 = {
module.exports = { id, ipv4, ipv4Array: c.array(ipv4) }
function id (host, port, out = Buffer.allocUnsafe(32)) {
function id (host, port, out = b4a.allocUnsafe(32)) {
const addr = out.subarray(0, 6)
ipv4.encode(
{ start: 0, end: 6, buffer: addr },

5
lib/query.js

@ -1,4 +1,5 @@
const { Readable } = require('streamx')
const b4a = require('b4a')
const peer = require('./peer')
const { DOWN_HINT } = require('./commands')
@ -240,7 +241,7 @@ module.exports = class Query extends Readable {
if (m.closerNodes !== null) {
for (const node of m.closerNodes) {
node.id = peer.id(node.host, node.port)
if (node.id.equals(this.dht.table.id)) continue
if (b4a.equals(node.id, this.dht.table.id)) continue
// TODO: we could continue here instead of breaking to ensure that one of the nodes in the closer list
// is later marked as DOWN that we gossip that back
if (!this._addPending(node, m.from)) break
@ -281,7 +282,7 @@ module.exports = class Query extends Readable {
}
_downHint (node, down) {
const state = { start: 0, end: 6, buffer: Buffer.allocUnsafe(6) }
const state = { start: 0, end: 6, buffer: b4a.allocUnsafe(6) }
peer.ipv4.encode(state, down)
this.dht._request(node, true, DOWN_HINT, null, state.buffer, noop, noop)
}

1
package.json

@ -4,6 +4,7 @@
"description": "Make RPC calls over a Kademlia based DHT",
"main": "index.js",
"dependencies": {
"b4a": "^1.3.1",
"bind-easy": "^1.0.0",
"compact-encoding": "^2.1.0",
"compact-encoding-net": "^1.0.1",

Loading…
Cancel
Save