Browse Source

add onflush hook to reply

session-estimator
Mathias Buus 3 years ago
parent
commit
9e154a2f50
  1. 9
      lib/io.js
  2. 25
      test.js

9
lib/io.js

@ -238,7 +238,8 @@ class Request {
reply (value, opts = {}) {
const socket = opts.socket || this.socket
const to = opts.to || this.from
this._sendReply(0, value || null, opts.token !== false, opts.closerNodes !== false, to, socket)
const onflush = opts.onflush || null
this._sendReply(0, value || null, opts.token !== false, opts.closerNodes !== false, to, socket, onflush)
}
error (code, opts = {}) {
@ -268,7 +269,7 @@ class Request {
}
sendReply (error, value, token, hasCloserNodes) {
this._sendReply(error, value, token, hasCloserNodes, this.from, this.socket)
this._sendReply(error, value, token, hasCloserNodes, this.from, this.socket, null)
}
_sendNow () {
@ -293,7 +294,7 @@ class Request {
this.onerror(err || DESTROY, this)
}
_sendReply (error, value, token, hasCloserNodes, from, socket) {
_sendReply (error, value, token, hasCloserNodes, from, socket, onflush) {
if (socket === null || this.destroyed) return
const id = this._io.ephemeral === false && socket === this._io.serverSocket
@ -319,7 +320,7 @@ class Request {
if (error > 0) c.uint.encode(state, error)
if (value) c.buffer.encode(state, value)
socket.send(state.buffer, 0, state.buffer.byteLength, from.port, from.host)
socket.send(state.buffer, 0, state.buffer.byteLength, from.port, from.host, onflush)
}
_encodeRequest (token, value, socket) {

25
test.js

@ -137,7 +137,7 @@ tape('request with/without retries', async function (t) {
})
try {
await a.request({ command: 'nope', target: Buffer.alloc(32) }, { host: '127.0.0.1', port: b.address().port })
await a.request({ command: 'nope' }, { host: '127.0.0.1', port: b.address().port })
} catch {
// do nothing
}
@ -145,7 +145,7 @@ tape('request with/without retries', async function (t) {
t.same(tries, 3)
try {
await a.request({ command: 'nope', target: Buffer.alloc(32) }, { host: '127.0.0.1', port: b.address().port }, { retry: false })
await a.request({ command: 'nope' }, { host: '127.0.0.1', port: b.address().port }, { retry: false })
} catch {
// do nothing
}
@ -157,6 +157,27 @@ tape('request with/without retries', async function (t) {
b.destroy()
})
tape('reply onflush', async function (t) {
const [bootstrap, a, b] = await makeSwarm(3)
let flushed = false
b.on('request', function (req) {
req.reply(null, {
onflush () {
flushed = true
}
})
})
await a.request({ command: 'hello' }, { host: '127.0.0.1', port: b.address().port })
t.ok(flushed)
bootstrap.destroy()
a.destroy()
b.destroy()
})
tape('shorthand commit', async function (t) {
const swarm = await makeSwarm(40)

Loading…
Cancel
Save