diff --git a/lib/io.js b/lib/io.js index 369e202..32823b5 100644 --- a/lib/io.js +++ b/lib/io.js @@ -238,7 +238,8 @@ class Request { reply (value, opts = {}) { const socket = opts.socket || this.socket const to = opts.to || this.from - this._sendReply(0, value || null, opts.token !== false, opts.closerNodes !== false, to, socket) + const onflush = opts.onflush || null + this._sendReply(0, value || null, opts.token !== false, opts.closerNodes !== false, to, socket, onflush) } error (code, opts = {}) { @@ -268,7 +269,7 @@ class Request { } sendReply (error, value, token, hasCloserNodes) { - this._sendReply(error, value, token, hasCloserNodes, this.from, this.socket) + this._sendReply(error, value, token, hasCloserNodes, this.from, this.socket, null) } _sendNow () { @@ -293,7 +294,7 @@ class Request { this.onerror(err || DESTROY, this) } - _sendReply (error, value, token, hasCloserNodes, from, socket) { + _sendReply (error, value, token, hasCloserNodes, from, socket, onflush) { if (socket === null || this.destroyed) return const id = this._io.ephemeral === false && socket === this._io.serverSocket @@ -319,7 +320,7 @@ class Request { if (error > 0) c.uint.encode(state, error) if (value) c.buffer.encode(state, value) - socket.send(state.buffer, 0, state.buffer.byteLength, from.port, from.host) + socket.send(state.buffer, 0, state.buffer.byteLength, from.port, from.host, onflush) } _encodeRequest (token, value, socket) { diff --git a/test.js b/test.js index 462cb87..37bdb9f 100644 --- a/test.js +++ b/test.js @@ -137,7 +137,7 @@ tape('request with/without retries', async function (t) { }) try { - await a.request({ command: 'nope', target: Buffer.alloc(32) }, { host: '127.0.0.1', port: b.address().port }) + await a.request({ command: 'nope' }, { host: '127.0.0.1', port: b.address().port }) } catch { // do nothing } @@ -145,7 +145,7 @@ tape('request with/without retries', async function (t) { t.same(tries, 3) try { - await a.request({ command: 'nope', target: Buffer.alloc(32) }, { host: '127.0.0.1', port: b.address().port }, { retry: false }) + await a.request({ command: 'nope' }, { host: '127.0.0.1', port: b.address().port }, { retry: false }) } catch { // do nothing } @@ -157,6 +157,27 @@ tape('request with/without retries', async function (t) { b.destroy() }) +tape('reply onflush', async function (t) { + const [bootstrap, a, b] = await makeSwarm(3) + + let flushed = false + + b.on('request', function (req) { + req.reply(null, { + onflush () { + flushed = true + } + }) + }) + + await a.request({ command: 'hello' }, { host: '127.0.0.1', port: b.address().port }) + t.ok(flushed) + + bootstrap.destroy() + a.destroy() + b.destroy() +}) + tape('shorthand commit', async function (t) { const swarm = await makeSwarm(40)