Browse Source

stream_base: expose `bytesRead` getter

This will provide `bytesRead` data on consumed sockets.

Fix: #3021
PR-URL: https://github.com/nodejs/node/pull/6284
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
v4.x
Fedor Indutny 9 years ago
committed by Myles Borins
parent
commit
55319fe798
  1. 18
      lib/net.js
  2. 1
      src/env.h
  3. 17
      src/stream_base-inl.h
  4. 13
      src/stream_base.h
  5. 37
      test/parallel/test-net-bytes-read.js

18
lib/net.js

@ -97,7 +97,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs;
// called when creating new Socket, or when re-using a closed Socket // called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) { function initSocketHandle(self) {
self.destroyed = false; self.destroyed = false;
self.bytesRead = 0;
self._bytesDispatched = 0; self._bytesDispatched = 0;
self._sockname = null; self._sockname = null;
@ -112,6 +111,10 @@ function initSocketHandle(self) {
} }
} }
const BYTES_READ = Symbol('bytesRead');
function Socket(options) { function Socket(options) {
if (!(this instanceof Socket)) return new Socket(options); if (!(this instanceof Socket)) return new Socket(options);
@ -179,6 +182,9 @@ function Socket(options) {
// Reserve properties // Reserve properties
this.server = null; this.server = null;
this._server = null; this._server = null;
// Used after `.destroy()`
this[BYTES_READ] = 0;
} }
util.inherits(Socket, stream.Duplex); util.inherits(Socket, stream.Duplex);
@ -472,6 +478,9 @@ Socket.prototype._destroy = function(exception, cb) {
if (this !== process.stderr) if (this !== process.stderr)
debug('close handle'); debug('close handle');
var isException = exception ? true : false; var isException = exception ? true : false;
// `bytesRead` should be accessible after `.destroy()`
this[BYTES_READ] = this._handle.bytesRead;
this._handle.close(function() { this._handle.close(function() {
debug('emit close'); debug('emit close');
self.emit('close', isException); self.emit('close', isException);
@ -523,10 +532,6 @@ function onread(nread, buffer) {
// will prevent this from being called again until _read() gets // will prevent this from being called again until _read() gets
// called again. // called again.
// if it's not enough data, we'll just call handle.readStart()
// again right away.
self.bytesRead += nread;
// Optimization: emit the original buffer with end points // Optimization: emit the original buffer with end points
var ret = self.push(buffer); var ret = self.push(buffer);
@ -582,6 +587,9 @@ Socket.prototype._getpeername = function() {
return this._peername; return this._peername;
}; };
Socket.prototype.__defineGetter__('bytesRead', function() {
return this._handle ? this._handle.bytesRead : this[BYTES_READ];
});
Socket.prototype.__defineGetter__('remoteAddress', function() { Socket.prototype.__defineGetter__('remoteAddress', function() {
return this._getpeername().address; return this._getpeername().address;

1
src/env.h

@ -54,6 +54,7 @@ namespace node {
V(buffer_string, "buffer") \ V(buffer_string, "buffer") \
V(bytes_string, "bytes") \ V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \ V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \
V(callback_string, "callback") \ V(callback_string, "callback") \
V(change_string, "change") \ V(change_string, "change") \
V(oncertcb_string, "oncertcb") \ V(oncertcb_string, "oncertcb") \

17
src/stream_base-inl.h

@ -43,6 +43,13 @@ void StreamBase::AddMethods(Environment* env,
v8::DEFAULT, v8::DEFAULT,
attributes); attributes);
t->InstanceTemplate()->SetAccessor(env->bytes_read_string(),
GetBytesRead<Base>,
nullptr,
env->as_external(),
v8::DEFAULT,
attributes);
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>); env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>); env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
if ((flags & kFlagNoShutdown) == 0) if ((flags & kFlagNoShutdown) == 0)
@ -79,6 +86,16 @@ void StreamBase::GetFD(Local<String> key,
} }
template <class Base>
void StreamBase::GetBytesRead(Local<String> key,
const PropertyCallbackInfo<Value>& args) {
StreamBase* wrap = Unwrap<Base>(args.Holder());
// uint64_t -> double. 53bits is enough for all real cases.
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
}
template <class Base> template <class Base>
void StreamBase::GetExternal(Local<String> key, void StreamBase::GetExternal(Local<String> key,
const PropertyCallbackInfo<Value>& args) { const PropertyCallbackInfo<Value>& args) {

13
src/stream_base.h

@ -136,7 +136,7 @@ class StreamResource {
uv_handle_type pending, uv_handle_type pending,
void* ctx); void* ctx);
StreamResource() { StreamResource() : bytes_read_(0) {
} }
virtual ~StreamResource() = default; virtual ~StreamResource() = default;
@ -160,9 +160,11 @@ class StreamResource {
alloc_cb_.fn(size, buf, alloc_cb_.ctx); alloc_cb_.fn(size, buf, alloc_cb_.ctx);
} }
inline void OnRead(size_t nread, inline void OnRead(ssize_t nread,
const uv_buf_t* buf, const uv_buf_t* buf,
uv_handle_type pending = UV_UNKNOWN_HANDLE) { uv_handle_type pending = UV_UNKNOWN_HANDLE) {
if (nread > 0)
bytes_read_ += static_cast<uint64_t>(nread);
if (!read_cb_.is_empty()) if (!read_cb_.is_empty())
read_cb_.fn(nread, buf, pending, read_cb_.ctx); read_cb_.fn(nread, buf, pending, read_cb_.ctx);
} }
@ -182,6 +184,9 @@ class StreamResource {
Callback<AfterWriteCb> after_write_cb_; Callback<AfterWriteCb> after_write_cb_;
Callback<AllocCb> alloc_cb_; Callback<AllocCb> alloc_cb_;
Callback<ReadCb> read_cb_; Callback<ReadCb> read_cb_;
uint64_t bytes_read_;
friend class StreamBase;
}; };
class StreamBase : public StreamResource { class StreamBase : public StreamResource {
@ -249,6 +254,10 @@ class StreamBase : public StreamResource {
static void GetExternal(v8::Local<v8::String> key, static void GetExternal(v8::Local<v8::String> key,
const v8::PropertyCallbackInfo<v8::Value>& args); const v8::PropertyCallbackInfo<v8::Value>& args);
template <class Base>
static void GetBytesRead(v8::Local<v8::String> key,
const v8::PropertyCallbackInfo<v8::Value>& args);
template <class Base, template <class Base,
int (StreamBase::*Method)( // NOLINT(whitespace/parens) int (StreamBase::*Method)( // NOLINT(whitespace/parens)
const v8::FunctionCallbackInfo<v8::Value>& args)> const v8::FunctionCallbackInfo<v8::Value>& args)>

37
test/parallel/test-net-bytes-read.js

@ -0,0 +1,37 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');
const big = Buffer(1024 * 1024);
const server = net.createServer((socket) => {
socket.end(big);
server.close();
}).listen(common.PORT, () => {
let prev = 0;
function checkRaise(value) {
assert(value > prev);
prev = value;
}
const socket = net.connect(common.PORT, () => {
socket.on('data', (chunk) => {
checkRaise(socket.bytesRead);
});
socket.on('end', common.mustCall(() => {
assert.equal(socket.bytesRead, prev);
assert.equal(big.length, prev);
}));
socket.on('close', common.mustCall(() => {
assert(!socket._handle);
assert.equal(socket.bytesRead, prev);
assert.equal(big.length, prev);
}));
});
socket.end();
});
Loading…
Cancel
Save