Browse Source

uv_write2 uv_read2_start binding

v0.7.4-release
Ryan Dahl 13 years ago
parent
commit
471c5701c3
  1. 8
      src/pipe_wrap.cc
  2. 2
      src/pipe_wrap.h
  3. 80
      src/stream_wrap.cc
  4. 7
      src/stream_wrap.h
  5. 82
      src/tcp_wrap.cc
  6. 37
      src/tcp_wrap.h

8
src/pipe_wrap.cc

@ -86,16 +86,16 @@ Handle<Value> PipeWrap::New(const Arguments& args) {
assert(args.IsConstructCall());
HandleScope scope;
PipeWrap* wrap = new PipeWrap(args.This());
PipeWrap* wrap = new PipeWrap(args.This(), args[0]->IsTrue());
assert(wrap);
return scope.Close(args.This());
}
PipeWrap::PipeWrap(Handle<Object> object) : StreamWrap(object,
(uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, 0);
PipeWrap::PipeWrap(Handle<Object> object, bool ipc)
: StreamWrap(object, (uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, ipc);
assert(r == 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
handle_.data = reinterpret_cast<void*>(this);

2
src/pipe_wrap.h

@ -12,7 +12,7 @@ class PipeWrap : StreamWrap {
static void Initialize(v8::Handle<v8::Object> target);
private:
PipeWrap(v8::Handle<v8::Object> object);
PipeWrap(v8::Handle<v8::Object> object, bool ipc);
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind(const v8::Arguments& args);

80
src/stream_wrap.cc

@ -2,6 +2,7 @@
#include <node_buffer.h>
#include <handle_wrap.h>
#include <stream_wrap.h>
#include <tcp_wrap.h>
#include <req_wrap.h>
@ -95,7 +96,14 @@ Handle<Value> StreamWrap::ReadStart(const Arguments& args) {
UNWRAP
int r = uv_read_start(wrap->stream_, OnAlloc, OnRead);
bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;
int r;
if (ipc_pipe) {
r = uv_read2_start(wrap->stream_, OnAlloc, OnRead2);
} else {
r = uv_read_start(wrap->stream_, OnAlloc, OnRead);
}
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
@ -170,9 +178,13 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
return buf;
}
void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending) {
HandleScope scope;
assert(pending == UV_UNKNOWN_HANDLE); // TODO
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
// We should not be getting this callback if someone as already called
@ -201,25 +213,59 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
}
if (nread > 0) {
Local<Value> argv[3] = {
int argc = 3;
Local<Value> argv[4] = {
slab_v,
Integer::New(wrap->slab_offset_),
Integer::New(nread)
};
MakeCallback(wrap->object_, "onread", 3, argv);
if (pending == UV_TCP) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();
// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
int r = uv_accept(handle, pending_wrap->GetStream());
assert(r == 0);
argv[3] = pending_obj;
argc++;
} else {
// We only support sending UV_TCP right now.
assert(pending == UV_UNKNOWN_HANDLE);
}
MakeCallback(wrap->object_, "onread", argc, argv);
}
}
void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE);
}
void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
}
Handle<Value> StreamWrap::Write(const Arguments& args) {
HandleScope scope;
UNWRAP
bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;
// The first argument is a buffer.
assert(Buffer::HasInstance(args[0]));
Local<Object> buffer_obj = args[0]->ToObject();
size_t offset = 0;
size_t length = Buffer::Length(buffer_obj);
@ -239,7 +285,29 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
buf.base = Buffer::Data(buffer_obj) + offset;
buf.len = length;
int r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite);
int r;
if (!ipc_pipe) {
r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite);
} else {
uv_stream_t* send_stream = NULL;
if (args.Length() > 3) {
assert(args[3]->IsObject());
Local<Object> send_stream_obj = args[3]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
send_stream = send_stream_wrap->GetStream();
}
r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
send_stream,
StreamWrap::AfterWrite);
}
req_wrap->Dispatched();

7
src/stream_wrap.h

@ -32,9 +32,14 @@ class StreamWrap : public HandleWrap {
// Callbacks for libuv
static void AfterWrite(uv_write_t* req, int status);
static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size);
static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
static void AfterShutdown(uv_shutdown_t* req, int status);
static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
static void OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending);
static void OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending);
size_t slab_offset_;
uv_stream_t* stream_;
};

82
src/tcp_wrap.cc

@ -3,6 +3,7 @@
#include <req_wrap.h>
#include <handle_wrap.h>
#include <stream_wrap.h>
#include <tcp_wrap.h>
// Temporary hack: libuv should provide uv_inet_pton and uv_inet_ntop.
#if defined(__MINGW32__) || defined(_MSC_VER)
@ -45,8 +46,7 @@ using v8::Context;
using v8::Arguments;
using v8::Integer;
Persistent<Function> tcpConstructor;
static Persistent<Function> tcpConstructor;
static Persistent<String> family_symbol;
static Persistent<String> address_symbol;
static Persistent<String> port_symbol;
@ -55,10 +55,13 @@ static Persistent<String> port_symbol;
typedef class ReqWrap<uv_connect_t> ConnectWrap;
class TCPWrap : public StreamWrap {
public:
Local<Object> TCPWrap::Instantiate() {
HandleScope scope;
return scope.Close(tcpConstructor->NewInstance());
}
static void Initialize(Handle<Object> target) {
void TCPWrap::Initialize(Handle<Object> target) {
HandleWrap::Initialize(target);
StreamWrap::Initialize(target);
@ -91,10 +94,10 @@ class TCPWrap : public StreamWrap {
port_symbol = NODE_PSYMBOL("port");
target->Set(String::NewSymbol("TCP"), tcpConstructor);
}
}
private:
static Handle<Value> New(const Arguments& args) {
Handle<Value> TCPWrap::New(const Arguments& args) {
// This constructor should not be exposed to public javascript.
// Therefore we assert that we are not trying to call this as a
// normal function.
@ -105,21 +108,24 @@ class TCPWrap : public StreamWrap {
assert(wrap);
return scope.Close(args.This());
}
}
TCPWrap(Handle<Object> object) : StreamWrap(object,
(uv_stream_t*) &handle_) {
TCPWrap::TCPWrap(Handle<Object> object)
: StreamWrap(object, (uv_stream_t*) &handle_) {
int r = uv_tcp_init(uv_default_loop(), &handle_);
assert(r == 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
UpdateWriteQueueSize();
}
}
~TCPWrap() {
TCPWrap::~TCPWrap() {
assert(object_.IsEmpty());
}
}
static Handle<Value> GetSockName(const Arguments& args) {
Handle<Value> TCPWrap::GetSockName(const Arguments& args) {
HandleScope scope;
struct sockaddr_storage address;
int family;
@ -154,10 +160,10 @@ class TCPWrap : public StreamWrap {
}
return scope.Close(sockname);
}
}
static Handle<Value> GetPeerName(const Arguments& args) {
Handle<Value> TCPWrap::GetPeerName(const Arguments& args) {
HandleScope scope;
struct sockaddr_storage address;
int family;
@ -192,10 +198,10 @@ class TCPWrap : public StreamWrap {
}
return scope.Close(sockname);
}
}
static Handle<Value> Bind(const Arguments& args) {
Handle<Value> TCPWrap::Bind(const Arguments& args) {
HandleScope scope;
UNWRAP
@ -210,9 +216,10 @@ class TCPWrap : public StreamWrap {
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
return scope.Close(Integer::New(r));
}
}
static Handle<Value> Bind6(const Arguments& args) {
Handle<Value> TCPWrap::Bind6(const Arguments& args) {
HandleScope scope;
UNWRAP
@ -227,9 +234,10 @@ class TCPWrap : public StreamWrap {
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
return scope.Close(Integer::New(r));
}
}
static Handle<Value> Listen(const Arguments& args) {
Handle<Value> TCPWrap::Listen(const Arguments& args) {
HandleScope scope;
UNWRAP
@ -242,9 +250,10 @@ class TCPWrap : public StreamWrap {
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
return scope.Close(Integer::New(r));
}
}
static void OnConnection(uv_stream_t* handle, int status) {
void TCPWrap::OnConnection(uv_stream_t* handle, int status) {
HandleScope scope;
TCPWrap* wrap = static_cast<TCPWrap*>(handle->data);
@ -258,7 +267,7 @@ class TCPWrap : public StreamWrap {
if (status == 0) {
// Instantiate the client javascript object and handle.
Local<Object> client_obj = tcpConstructor->NewInstance();
Local<Object> client_obj = Instantiate();
// Unwrap the client javascript object.
assert(client_obj->InternalFieldCount() > 0);
@ -278,9 +287,10 @@ class TCPWrap : public StreamWrap {
}
MakeCallback(wrap->object_, "onconnection", 1, argv);
}
}
static void AfterConnect(uv_connect_t* req, int status) {
void TCPWrap::AfterConnect(uv_connect_t* req, int status) {
ConnectWrap* req_wrap = (ConnectWrap*) req->data;
TCPWrap* wrap = (TCPWrap*) req->handle->data;
@ -303,9 +313,10 @@ class TCPWrap : public StreamWrap {
MakeCallback(req_wrap->object_, "oncomplete", 3, argv);
delete req_wrap;
}
}
static Handle<Value> Connect(const Arguments& args) {
Handle<Value> TCPWrap::Connect(const Arguments& args) {
HandleScope scope;
UNWRAP
@ -332,9 +343,10 @@ class TCPWrap : public StreamWrap {
} else {
return scope.Close(req_wrap->object_);
}
}
}
static Handle<Value> Connect6(const Arguments& args) {
Handle<Value> TCPWrap::Connect6(const Arguments& args) {
HandleScope scope;
UNWRAP
@ -358,11 +370,7 @@ class TCPWrap : public StreamWrap {
} else {
return scope.Close(req_wrap->object_);
}
}
uv_tcp_t handle_;
};
}
} // namespace node

37
src/tcp_wrap.h

@ -0,0 +1,37 @@
#ifndef TCP_WRAP_H_
#define TCP_WRAP_H_
#include <stream_wrap.h>
namespace node {
class TCPWrap : public StreamWrap {
public:
static v8::Local<v8::Object> Instantiate();
static TCPWrap* Unwrap(v8::Local<v8::Object> obj);
static void Initialize(v8::Handle<v8::Object> target);
private:
TCPWrap(v8::Handle<v8::Object> object);
~TCPWrap();
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> GetSockName(const v8::Arguments& args);
static v8::Handle<v8::Value> GetPeerName(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind6(const v8::Arguments& args);
static v8::Handle<v8::Value> Listen(const v8::Arguments& args);
static v8::Handle<v8::Value> Connect(const v8::Arguments& args);
static v8::Handle<v8::Value> Connect6(const v8::Arguments& args);
static v8::Handle<v8::Value> Open(const v8::Arguments& args);
static void OnConnection(uv_stream_t* handle, int status);
static void AfterConnect(uv_connect_t* req, int status);
uv_tcp_t handle_;
};
} // namespace node
#endif // TCP_WRAP_H_
Loading…
Cancel
Save