Browse Source

Merge branch 'ipc-binding'

v0.7.4-release
Ryan Dahl 13 years ago
parent
commit
51f7ba4147
  1. 2
      deps/uv/Makefile
  2. 10
      deps/uv/src/unix/process.c
  3. 19
      deps/uv/src/unix/stream.c
  4. 28
      deps/uv/test/test-fs.c
  5. 121
      lib/child_process_uv.js
  6. 3
      src/node.cc
  7. 8
      src/pipe_wrap.cc
  8. 2
      src/pipe_wrap.h
  9. 80
      src/stream_wrap.cc
  10. 7
      src/stream_wrap.h
  11. 474
      src/tcp_wrap.cc
  12. 37
      src/tcp_wrap.h
  13. 0
      test/simple/test-child-process-fork.js

2
deps/uv/Makefile

@ -80,7 +80,7 @@ endif
TESTS=test/blackhole-server.c test/echo-server.c test/test-*.c
BENCHMARKS=test/blackhole-server.c test/echo-server.c test/dns-server.c test/benchmark-*.c
all: uv.a test/run-tests$(E) test/run-benchmarks$(E)
all: uv.a
$(CARES_OBJS): %.o: %.c
$(CC) -o $*.o -c $(CFLAGS) $(CPPFLAGS) $< -DHAVE_CONFIG_H

10
deps/uv/src/unix/process.c

@ -110,6 +110,7 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
#endif
int status;
pid_t pid;
int flags;
uv__handle_init(loop, (uv_handle_t*)process, UV_PROCESS);
loop->counters.process_init++;
@ -255,8 +256,9 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
assert(stdin_pipe[0] >= 0);
uv__close(stdin_pipe[0]);
uv__nonblock(stdin_pipe[1], 1);
flags = UV_WRITABLE | (options.stdin_stream->ipc ? UV_READABLE : 0);
uv__stream_open((uv_stream_t*)options.stdin_stream, stdin_pipe[1],
UV_WRITABLE);
flags);
}
if (stdout_pipe[0] >= 0) {
@ -264,8 +266,9 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
assert(stdout_pipe[1] >= 0);
uv__close(stdout_pipe[1]);
uv__nonblock(stdout_pipe[0], 1);
flags = UV_READABLE | (options.stdout_stream->ipc ? UV_WRITABLE : 0);
uv__stream_open((uv_stream_t*)options.stdout_stream, stdout_pipe[0],
UV_READABLE);
flags);
}
if (stderr_pipe[0] >= 0) {
@ -273,8 +276,9 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
assert(stderr_pipe[1] >= 0);
uv__close(stderr_pipe[1]);
uv__nonblock(stderr_pipe[0], 1);
flags = UV_READABLE | (options.stderr_stream->ipc ? UV_WRITABLE : 0);
uv__stream_open((uv_stream_t*)options.stderr_stream, stderr_pipe[0],
UV_READABLE);
flags);
}
return 0;

19
deps/uv/src/unix/stream.c

@ -563,6 +563,7 @@ static void uv__read(uv_stream_t* stream) {
return;
} else {
/* Successful read */
size_t buflen = buf.len;
if (stream->read_cb) {
stream->read_cb(stream, nread, buf);
@ -599,6 +600,11 @@ static void uv__read(uv_stream_t* stream) {
} else {
stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE);
}
/* Return if we didn't fill the buffer, there is no more data to read. */
if (nread < buflen) {
return;
}
}
}
}
@ -907,14 +913,11 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
int uv_read_stop(uv_stream_t* stream) {
uv_tcp_t* tcp = (uv_tcp_t*)stream;
((uv_handle_t*)tcp)->flags &= ~UV_READING;
ev_io_stop(tcp->loop->ev, &tcp->read_watcher);
tcp->read_cb = NULL;
tcp->read2_cb = NULL;
tcp->alloc_cb = NULL;
ev_io_stop(stream->loop->ev, &stream->read_watcher);
stream->flags &= ~UV_READING;
stream->read_cb = NULL;
stream->read2_cb = NULL;
stream->alloc_cb = NULL;
return 0;
}

28
deps/uv/test/test-fs.c

@ -1163,13 +1163,21 @@ TEST_IMPL(fs_symlink) {
TEST_IMPL(fs_utime) {
utime_check_t checkme;
const char* path = ".";
const char* path = "test_file";
double atime;
double mtime;
uv_fs_t req;
int r;
/* Setup. */
loop = uv_default_loop();
unlink(path);
r = uv_fs_open(loop, &req, path, O_RDWR | O_CREAT,
S_IWRITE | S_IREAD, NULL);
ASSERT(r != -1);
ASSERT(req.result != -1);
uv_fs_req_cleanup(&req);
close(r);
atime = mtime = 400497753; /* 1982-09-10 11:22:33 */
@ -1196,24 +1204,35 @@ TEST_IMPL(fs_utime) {
uv_run(loop);
ASSERT(utime_cb_count == 1);
/* Cleanup. */
unlink(path);
return 0;
}
TEST_IMPL(fs_futime) {
utime_check_t checkme;
const char* path = ".";
const char* path = "test_file";
double atime;
double mtime;
uv_file file;
uv_fs_t req;
int r;
/* Setup. */
loop = uv_default_loop();
unlink(path);
r = uv_fs_open(loop, &req, path, O_RDWR | O_CREAT,
S_IWRITE | S_IREAD, NULL);
ASSERT(r != -1);
ASSERT(req.result != -1);
uv_fs_req_cleanup(&req);
close(r);
atime = mtime = 400497753; /* 1982-09-10 11:22:33 */
r = uv_fs_open(loop, &req, path, O_RDONLY, 0, NULL);
r = uv_fs_open(loop, &req, path, O_RDWR, 0, NULL);
ASSERT(r != -1);
ASSERT(req.result != -1);
file = req.result; /* FIXME probably not how it's supposed to be used */
@ -1243,6 +1262,9 @@ TEST_IMPL(fs_futime) {
uv_run(loop);
ASSERT(futime_cb_count == 1);
/* Cleanup. */
unlink(path);
return 0;
}

121
lib/child_process_uv.js

@ -24,11 +24,18 @@ var Process = process.binding('process_wrap').Process;
var inherits = require('util').inherits;
var constants; // if (!constants) constants = process.binding('constants');
var LF = '\n'.charCodeAt(0);
var Pipe;
// constructors for lazy loading
function createPipe() {
var Pipe = process.binding('pipe_wrap').Pipe;
return new Pipe();
function createPipe(ipc) {
// Lazy load
if (!Pipe) {
Pipe = new process.binding('pipe_wrap').Pipe;
}
return new Pipe(ipc);
}
function createSocket(pipe, readable) {
@ -61,6 +68,106 @@ function mergeOptions(target, overrides) {
}
function setupChannel(target, channel) {
target._channel = channel;
var jsonBuffer = '';
channel.onread = function(pool, offset, length) {
if (pool) {
for (var i = 0; i < length; i++) {
if (pool[offset + i] === LF) {
jsonBuffer += pool.toString('ascii', offset, offset + i);
var message = JSON.parse(jsonBuffer);
jsonBuffer = pool.toString('ascii', i, length);
offset = i + 1;
target.emit('message', message);
}
}
} else {
channel.close();
target._channel = null;
}
};
target.send = function(message, fd) {
if (fd) throw new Error("not yet implemented");
if (!target._channel) throw new Error("channel closed");
// For overflow protection don't write if channel queue is too deep.
if (channel.writeQueueSize > 1024 * 1024) {
return false;
}
var buffer = Buffer(JSON.stringify(message) + '\n');
var writeReq = channel.write(buffer);
if (!writeReq) {
throw new Error(errno + " cannot write to IPC channel.");
} else {
writeReq.oncomplete = nop;
}
return true;
};
channel.readStart();
}
function nop() { }
exports.fork = function(modulePath, args, options) {
if (!options) options = {};
if (!args) args = [];
args.unshift(modulePath);
if (options.stdinStream) {
throw new Error("stdinStream not allowed for fork()");
}
if (options.customFds) {
throw new Error("customFds not allowed for fork()");
}
// Leave stdin open for the IPC channel. stdout and stderr should be the
// same as the parent's.
options.customFds = [ -1, 1, 2 ];
// Just need to set this - child process won't actually use the fd.
// For backwards compat - this can be changed to 'NODE_CHANNEL' before v0.6.
options.env = { NODE_CHANNEL_FD: 42 };
// stdin is the IPC channel.
options.stdinStream = createPipe(true);
var child = spawn(process.execPath, args, options);
setupChannel(child, options.stdinStream);
child.on('exit', function() {
if (child._channel) {
child._channel.close();
}
});
return child;
};
exports._forkChild = function() {
// set process.send()
var p = createPipe(true);
p.open(0);
setupChannel(process, p);
};
exports.exec = function(command /*, options, callback */) {
var file, args, options, callback;
@ -213,7 +320,8 @@ var spawn = exports.spawn = function(file, args, options) {
cwd: options ? options.cwd : null,
windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments),
envPairs: envPairs,
customFds: options ? options.customFds : null
customFds: options ? options.customFds : null,
stdinStream: options ? options.stdinStream : null
});
return child;
@ -266,6 +374,9 @@ inherits(ChildProcess, EventEmitter);
function setStreamOption(name, index, options) {
// Skip if we already have options.stdinStream
if (options[name]) return;
if (options.customFds &&
typeof options.customFds[index] == 'number' &&
options.customFds[index] !== -1) {
@ -283,6 +394,8 @@ function setStreamOption(name, index, options) {
ChildProcess.prototype.spawn = function(options) {
var self = this;
debugger;
setStreamOption('stdinStream', 0, options);
setStreamOption('stdoutStream', 1, options);
setStreamOption('stderrStream', 2, options);

3
src/node.cc

@ -1078,6 +1078,9 @@ void MakeCallback(Handle<Object> object,
HandleScope scope;
Local<Value> callback_v = object->Get(String::New(method));
if (!callback_v->IsFunction()) {
fprintf(stderr, "method = %s", method);
}
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);

8
src/pipe_wrap.cc

@ -86,16 +86,16 @@ Handle<Value> PipeWrap::New(const Arguments& args) {
assert(args.IsConstructCall());
HandleScope scope;
PipeWrap* wrap = new PipeWrap(args.This());
PipeWrap* wrap = new PipeWrap(args.This(), args[0]->IsTrue());
assert(wrap);
return scope.Close(args.This());
}
PipeWrap::PipeWrap(Handle<Object> object) : StreamWrap(object,
(uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, 0);
PipeWrap::PipeWrap(Handle<Object> object, bool ipc)
: StreamWrap(object, (uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, ipc);
assert(r == 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
handle_.data = reinterpret_cast<void*>(this);

2
src/pipe_wrap.h

@ -12,7 +12,7 @@ class PipeWrap : StreamWrap {
static void Initialize(v8::Handle<v8::Object> target);
private:
PipeWrap(v8::Handle<v8::Object> object);
PipeWrap(v8::Handle<v8::Object> object, bool ipc);
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind(const v8::Arguments& args);

80
src/stream_wrap.cc

@ -2,6 +2,7 @@
#include <node_buffer.h>
#include <handle_wrap.h>
#include <stream_wrap.h>
#include <tcp_wrap.h>
#include <req_wrap.h>
@ -95,7 +96,14 @@ Handle<Value> StreamWrap::ReadStart(const Arguments& args) {
UNWRAP
int r = uv_read_start(wrap->stream_, OnAlloc, OnRead);
bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;
int r;
if (ipc_pipe) {
r = uv_read2_start(wrap->stream_, OnAlloc, OnRead2);
} else {
r = uv_read_start(wrap->stream_, OnAlloc, OnRead);
}
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
@ -170,9 +178,13 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
return buf;
}
void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending) {
HandleScope scope;
assert(pending == UV_UNKNOWN_HANDLE); // TODO
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
// We should not be getting this callback if someone as already called
@ -201,25 +213,59 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
}
if (nread > 0) {
Local<Value> argv[3] = {
int argc = 3;
Local<Value> argv[4] = {
slab_v,
Integer::New(wrap->slab_offset_),
Integer::New(nread)
};
MakeCallback(wrap->object_, "onread", 3, argv);
if (pending == UV_TCP) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();
// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
int r = uv_accept(handle, pending_wrap->GetStream());
assert(r == 0);
argv[3] = pending_obj;
argc++;
} else {
// We only support sending UV_TCP right now.
assert(pending == UV_UNKNOWN_HANDLE);
}
MakeCallback(wrap->object_, "onread", argc, argv);
}
}
void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE);
}
void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
}
Handle<Value> StreamWrap::Write(const Arguments& args) {
HandleScope scope;
UNWRAP
bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;
// The first argument is a buffer.
assert(Buffer::HasInstance(args[0]));
Local<Object> buffer_obj = args[0]->ToObject();
size_t offset = 0;
size_t length = Buffer::Length(buffer_obj);
@ -239,7 +285,29 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
buf.base = Buffer::Data(buffer_obj) + offset;
buf.len = length;
int r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite);
int r;
if (!ipc_pipe) {
r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite);
} else {
uv_stream_t* send_stream = NULL;
if (args.Length() > 3) {
assert(args[3]->IsObject());
Local<Object> send_stream_obj = args[3]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
send_stream = send_stream_wrap->GetStream();
}
r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
send_stream,
StreamWrap::AfterWrite);
}
req_wrap->Dispatched();

7
src/stream_wrap.h

@ -32,9 +32,14 @@ class StreamWrap : public HandleWrap {
// Callbacks for libuv
static void AfterWrite(uv_write_t* req, int status);
static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size);
static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
static void AfterShutdown(uv_shutdown_t* req, int status);
static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
static void OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending);
static void OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending);
size_t slab_offset_;
uv_stream_t* stream_;
};

474
src/tcp_wrap.cc

@ -3,6 +3,7 @@
#include <req_wrap.h>
#include <handle_wrap.h>
#include <stream_wrap.h>
#include <tcp_wrap.h>
// Temporary hack: libuv should provide uv_inet_pton and uv_inet_ntop.
#if defined(__MINGW32__) || defined(_MSC_VER)
@ -45,8 +46,7 @@ using v8::Context;
using v8::Arguments;
using v8::Integer;
Persistent<Function> tcpConstructor;
static Persistent<Function> tcpConstructor;
static Persistent<String> family_symbol;
static Persistent<String> address_symbol;
static Persistent<String> port_symbol;
@ -55,314 +55,322 @@ static Persistent<String> port_symbol;
typedef class ReqWrap<uv_connect_t> ConnectWrap;
class TCPWrap : public StreamWrap {
public:
Local<Object> TCPWrap::Instantiate() {
HandleScope scope;
return scope.Close(tcpConstructor->NewInstance());
}
static void Initialize(Handle<Object> target) {
HandleWrap::Initialize(target);
StreamWrap::Initialize(target);
HandleScope scope;
void TCPWrap::Initialize(Handle<Object> target) {
HandleWrap::Initialize(target);
StreamWrap::Initialize(target);
Local<FunctionTemplate> t = FunctionTemplate::New(New);
t->SetClassName(String::NewSymbol("TCP"));
HandleScope scope;
t->InstanceTemplate()->SetInternalFieldCount(1);
Local<FunctionTemplate> t = FunctionTemplate::New(New);
t->SetClassName(String::NewSymbol("TCP"));
NODE_SET_PROTOTYPE_METHOD(t, "close", HandleWrap::Close);
t->InstanceTemplate()->SetInternalFieldCount(1);
NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart);
NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop);
NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write);
NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown);
NODE_SET_PROTOTYPE_METHOD(t, "close", HandleWrap::Close);
NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen);
NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(t, "bind6", Bind6);
NODE_SET_PROTOTYPE_METHOD(t, "connect6", Connect6);
NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName);
NODE_SET_PROTOTYPE_METHOD(t, "getpeername", GetPeerName);
NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart);
NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop);
NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write);
NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown);
tcpConstructor = Persistent<Function>::New(t->GetFunction());
NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen);
NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(t, "bind6", Bind6);
NODE_SET_PROTOTYPE_METHOD(t, "connect6", Connect6);
NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName);
NODE_SET_PROTOTYPE_METHOD(t, "getpeername", GetPeerName);
family_symbol = NODE_PSYMBOL("family");
address_symbol = NODE_PSYMBOL("address");
port_symbol = NODE_PSYMBOL("port");
tcpConstructor = Persistent<Function>::New(t->GetFunction());
target->Set(String::NewSymbol("TCP"), tcpConstructor);
}
family_symbol = NODE_PSYMBOL("family");
address_symbol = NODE_PSYMBOL("address");
port_symbol = NODE_PSYMBOL("port");
private:
static Handle<Value> New(const Arguments& args) {
// This constructor should not be exposed to public javascript.
// Therefore we assert that we are not trying to call this as a
// normal function.
assert(args.IsConstructCall());
target->Set(String::NewSymbol("TCP"), tcpConstructor);
}
HandleScope scope;
TCPWrap* wrap = new TCPWrap(args.This());
assert(wrap);
return scope.Close(args.This());
}
Handle<Value> TCPWrap::New(const Arguments& args) {
// This constructor should not be exposed to public javascript.
// Therefore we assert that we are not trying to call this as a
// normal function.
assert(args.IsConstructCall());
TCPWrap(Handle<Object> object) : StreamWrap(object,
(uv_stream_t*) &handle_) {
int r = uv_tcp_init(uv_default_loop(), &handle_);
assert(r == 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
UpdateWriteQueueSize();
}
HandleScope scope;
TCPWrap* wrap = new TCPWrap(args.This());
assert(wrap);
~TCPWrap() {
assert(object_.IsEmpty());
}
return scope.Close(args.This());
}
TCPWrap::TCPWrap(Handle<Object> object)
: StreamWrap(object, (uv_stream_t*) &handle_) {
int r = uv_tcp_init(uv_default_loop(), &handle_);
assert(r == 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
UpdateWriteQueueSize();
}
TCPWrap::~TCPWrap() {
assert(object_.IsEmpty());
}
static Handle<Value> GetSockName(const Arguments& args) {
HandleScope scope;
struct sockaddr_storage address;
int family;
int port;
char ip[INET6_ADDRSTRLEN];
UNWRAP
int addrlen = sizeof(address);
int r = uv_tcp_getsockname(&wrap->handle_,
reinterpret_cast<sockaddr*>(&address),
&addrlen);
Local<Object> sockname = Object::New();
if (r != 0) {
SetErrno(uv_last_error(uv_default_loop()).code);
} else {
family = address.ss_family;
if (family == AF_INET) {
struct sockaddr_in* addrin = (struct sockaddr_in*)&address;
uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin->sin_port);
} else if (family == AF_INET6) {
struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address;
uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin6->sin6_port);
}
sockname->Set(port_symbol, Integer::New(port));
sockname->Set(family_symbol, Integer::New(family));
sockname->Set(address_symbol, String::New(ip));
Handle<Value> TCPWrap::GetSockName(const Arguments& args) {
HandleScope scope;
struct sockaddr_storage address;
int family;
int port;
char ip[INET6_ADDRSTRLEN];
UNWRAP
int addrlen = sizeof(address);
int r = uv_tcp_getsockname(&wrap->handle_,
reinterpret_cast<sockaddr*>(&address),
&addrlen);
Local<Object> sockname = Object::New();
if (r != 0) {
SetErrno(uv_last_error(uv_default_loop()).code);
} else {
family = address.ss_family;
if (family == AF_INET) {
struct sockaddr_in* addrin = (struct sockaddr_in*)&address;
uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin->sin_port);
} else if (family == AF_INET6) {
struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address;
uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin6->sin6_port);
}
return scope.Close(sockname);
sockname->Set(port_symbol, Integer::New(port));
sockname->Set(family_symbol, Integer::New(family));
sockname->Set(address_symbol, String::New(ip));
}
static Handle<Value> GetPeerName(const Arguments& args) {
HandleScope scope;
struct sockaddr_storage address;
int family;
int port;
char ip[INET6_ADDRSTRLEN];
UNWRAP
int addrlen = sizeof(address);
int r = uv_tcp_getpeername(&wrap->handle_,
reinterpret_cast<sockaddr*>(&address),
&addrlen);
Local<Object> sockname = Object::New();
if (r != 0) {
SetErrno(uv_last_error(uv_default_loop()).code);
} else {
family = address.ss_family;
if (family == AF_INET) {
struct sockaddr_in* addrin = (struct sockaddr_in*)&address;
uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin->sin_port);
} else if (family == AF_INET6) {
struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address;
uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin6->sin6_port);
}
sockname->Set(port_symbol, Integer::New(port));
sockname->Set(family_symbol, Integer::New(family));
sockname->Set(address_symbol, String::New(ip));
return scope.Close(sockname);
}
Handle<Value> TCPWrap::GetPeerName(const Arguments& args) {
HandleScope scope;
struct sockaddr_storage address;
int family;
int port;
char ip[INET6_ADDRSTRLEN];
UNWRAP
int addrlen = sizeof(address);
int r = uv_tcp_getpeername(&wrap->handle_,
reinterpret_cast<sockaddr*>(&address),
&addrlen);
Local<Object> sockname = Object::New();
if (r != 0) {
SetErrno(uv_last_error(uv_default_loop()).code);
} else {
family = address.ss_family;
if (family == AF_INET) {
struct sockaddr_in* addrin = (struct sockaddr_in*)&address;
uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin->sin_port);
} else if (family == AF_INET6) {
struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address;
uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN);
port = ntohs(addrin6->sin6_port);
}
return scope.Close(sockname);
sockname->Set(port_symbol, Integer::New(port));
sockname->Set(family_symbol, Integer::New(family));
sockname->Set(address_symbol, String::New(ip));
}
return scope.Close(sockname);
}
static Handle<Value> Bind(const Arguments& args) {
HandleScope scope;
UNWRAP
Handle<Value> TCPWrap::Bind(const Arguments& args) {
HandleScope scope;
String::AsciiValue ip_address(args[0]->ToString());
int port = args[1]->Int32Value();
UNWRAP
struct sockaddr_in address = uv_ip4_addr(*ip_address, port);
int r = uv_tcp_bind(&wrap->handle_, address);
String::AsciiValue ip_address(args[0]->ToString());
int port = args[1]->Int32Value();
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
struct sockaddr_in address = uv_ip4_addr(*ip_address, port);
int r = uv_tcp_bind(&wrap->handle_, address);
return scope.Close(Integer::New(r));
}
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
static Handle<Value> Bind6(const Arguments& args) {
HandleScope scope;
return scope.Close(Integer::New(r));
}
UNWRAP
String::AsciiValue ip6_address(args[0]->ToString());
int port = args[1]->Int32Value();
Handle<Value> TCPWrap::Bind6(const Arguments& args) {
HandleScope scope;
struct sockaddr_in6 address = uv_ip6_addr(*ip6_address, port);
int r = uv_tcp_bind6(&wrap->handle_, address);
UNWRAP
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
String::AsciiValue ip6_address(args[0]->ToString());
int port = args[1]->Int32Value();
return scope.Close(Integer::New(r));
}
struct sockaddr_in6 address = uv_ip6_addr(*ip6_address, port);
int r = uv_tcp_bind6(&wrap->handle_, address);
static Handle<Value> Listen(const Arguments& args) {
HandleScope scope;
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
UNWRAP
return scope.Close(Integer::New(r));
}
int backlog = args[0]->Int32Value();
int r = uv_listen((uv_stream_t*)&wrap->handle_, backlog, OnConnection);
Handle<Value> TCPWrap::Listen(const Arguments& args) {
HandleScope scope;
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
UNWRAP
return scope.Close(Integer::New(r));
}
int backlog = args[0]->Int32Value();
static void OnConnection(uv_stream_t* handle, int status) {
HandleScope scope;
int r = uv_listen((uv_stream_t*)&wrap->handle_, backlog, OnConnection);
TCPWrap* wrap = static_cast<TCPWrap*>(handle->data);
assert(&wrap->handle_ == (uv_tcp_t*)handle);
// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
// We should not be getting this callback if someone as already called
// uv_close() on the handle.
assert(wrap->object_.IsEmpty() == false);
return scope.Close(Integer::New(r));
}
Handle<Value> argv[1];
if (status == 0) {
// Instantiate the client javascript object and handle.
Local<Object> client_obj = tcpConstructor->NewInstance();
void TCPWrap::OnConnection(uv_stream_t* handle, int status) {
HandleScope scope;
// Unwrap the client javascript object.
assert(client_obj->InternalFieldCount() > 0);
TCPWrap* client_wrap =
static_cast<TCPWrap*>(client_obj->GetPointerFromInternalField(0));
TCPWrap* wrap = static_cast<TCPWrap*>(handle->data);
assert(&wrap->handle_ == (uv_tcp_t*)handle);
int r = uv_accept(handle, (uv_stream_t*)&client_wrap->handle_);
// We should not be getting this callback if someone as already called
// uv_close() on the handle.
assert(wrap->object_.IsEmpty() == false);
// uv_accept should always work.
assert(r == 0);
Handle<Value> argv[1];
// Successful accept. Call the onconnection callback in JavaScript land.
argv[0] = client_obj;
} else {
SetErrno(uv_last_error(uv_default_loop()).code);
argv[0] = v8::Null();
}
if (status == 0) {
// Instantiate the client javascript object and handle.
Local<Object> client_obj = Instantiate();
MakeCallback(wrap->object_, "onconnection", 1, argv);
}
// Unwrap the client javascript object.
assert(client_obj->InternalFieldCount() > 0);
TCPWrap* client_wrap =
static_cast<TCPWrap*>(client_obj->GetPointerFromInternalField(0));
static void AfterConnect(uv_connect_t* req, int status) {
ConnectWrap* req_wrap = (ConnectWrap*) req->data;
TCPWrap* wrap = (TCPWrap*) req->handle->data;
int r = uv_accept(handle, (uv_stream_t*)&client_wrap->handle_);
HandleScope scope;
// uv_accept should always work.
assert(r == 0);
// The wrap and request objects should still be there.
assert(req_wrap->object_.IsEmpty() == false);
assert(wrap->object_.IsEmpty() == false);
// Successful accept. Call the onconnection callback in JavaScript land.
argv[0] = client_obj;
} else {
SetErrno(uv_last_error(uv_default_loop()).code);
argv[0] = v8::Null();
}
if (status) {
SetErrno(uv_last_error(uv_default_loop()).code);
}
MakeCallback(wrap->object_, "onconnection", 1, argv);
}
Local<Value> argv[3] = {
Integer::New(status),
Local<Value>::New(wrap->object_),
Local<Value>::New(req_wrap->object_)
};
MakeCallback(req_wrap->object_, "oncomplete", 3, argv);
void TCPWrap::AfterConnect(uv_connect_t* req, int status) {
ConnectWrap* req_wrap = (ConnectWrap*) req->data;
TCPWrap* wrap = (TCPWrap*) req->handle->data;
delete req_wrap;
HandleScope scope;
// The wrap and request objects should still be there.
assert(req_wrap->object_.IsEmpty() == false);
assert(wrap->object_.IsEmpty() == false);
if (status) {
SetErrno(uv_last_error(uv_default_loop()).code);
}
static Handle<Value> Connect(const Arguments& args) {
HandleScope scope;
Local<Value> argv[3] = {
Integer::New(status),
Local<Value>::New(wrap->object_),
Local<Value>::New(req_wrap->object_)
};
UNWRAP
MakeCallback(req_wrap->object_, "oncomplete", 3, argv);
String::AsciiValue ip_address(args[0]->ToString());
int port = args[1]->Int32Value();
delete req_wrap;
}
struct sockaddr_in address = uv_ip4_addr(*ip_address, port);
// I hate when people program C++ like it was C, and yet I do it too.
// I'm too lazy to come up with the perfect class hierarchy here. Let's
// just do some type munging.
ConnectWrap* req_wrap = new ConnectWrap();
int r = uv_tcp_connect(&req_wrap->req_, &wrap->handle_, address,
AfterConnect);
Handle<Value> TCPWrap::Connect(const Arguments& args) {
HandleScope scope;
req_wrap->Dispatched();
UNWRAP
if (r) {
SetErrno(uv_last_error(uv_default_loop()).code);
delete req_wrap;
return scope.Close(v8::Null());
} else {
return scope.Close(req_wrap->object_);
}
}
String::AsciiValue ip_address(args[0]->ToString());
int port = args[1]->Int32Value();
static Handle<Value> Connect6(const Arguments& args) {
HandleScope scope;
struct sockaddr_in address = uv_ip4_addr(*ip_address, port);
UNWRAP
// I hate when people program C++ like it was C, and yet I do it too.
// I'm too lazy to come up with the perfect class hierarchy here. Let's
// just do some type munging.
ConnectWrap* req_wrap = new ConnectWrap();
int r = uv_tcp_connect(&req_wrap->req_, &wrap->handle_, address,
AfterConnect);
String::AsciiValue ip_address(args[0]->ToString());
int port = args[1]->Int32Value();
req_wrap->Dispatched();
struct sockaddr_in6 address = uv_ip6_addr(*ip_address, port);
if (r) {
SetErrno(uv_last_error(uv_default_loop()).code);
delete req_wrap;
return scope.Close(v8::Null());
} else {
return scope.Close(req_wrap->object_);
}
}
ConnectWrap* req_wrap = new ConnectWrap();
int r = uv_tcp_connect6(&req_wrap->req_, &wrap->handle_, address,
AfterConnect);
Handle<Value> TCPWrap::Connect6(const Arguments& args) {
HandleScope scope;
req_wrap->Dispatched();
UNWRAP
if (r) {
SetErrno(uv_last_error(uv_default_loop()).code);
delete req_wrap;
return scope.Close(v8::Null());
} else {
return scope.Close(req_wrap->object_);
}
}
String::AsciiValue ip_address(args[0]->ToString());
int port = args[1]->Int32Value();
struct sockaddr_in6 address = uv_ip6_addr(*ip_address, port);
ConnectWrap* req_wrap = new ConnectWrap();
uv_tcp_t handle_;
};
int r = uv_tcp_connect6(&req_wrap->req_, &wrap->handle_, address,
AfterConnect);
req_wrap->Dispatched();
if (r) {
SetErrno(uv_last_error(uv_default_loop()).code);
delete req_wrap;
return scope.Close(v8::Null());
} else {
return scope.Close(req_wrap->object_);
}
}
} // namespace node

37
src/tcp_wrap.h

@ -0,0 +1,37 @@
#ifndef TCP_WRAP_H_
#define TCP_WRAP_H_
#include <stream_wrap.h>
namespace node {
class TCPWrap : public StreamWrap {
public:
static v8::Local<v8::Object> Instantiate();
static TCPWrap* Unwrap(v8::Local<v8::Object> obj);
static void Initialize(v8::Handle<v8::Object> target);
private:
TCPWrap(v8::Handle<v8::Object> object);
~TCPWrap();
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> GetSockName(const v8::Arguments& args);
static v8::Handle<v8::Value> GetPeerName(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind6(const v8::Arguments& args);
static v8::Handle<v8::Value> Listen(const v8::Arguments& args);
static v8::Handle<v8::Value> Connect(const v8::Arguments& args);
static v8::Handle<v8::Value> Connect6(const v8::Arguments& args);
static v8::Handle<v8::Value> Open(const v8::Arguments& args);
static void OnConnection(uv_stream_t* handle, int status);
static void AfterConnect(uv_connect_t* req, int status);
uv_tcp_t handle_;
};
} // namespace node
#endif // TCP_WRAP_H_

0
test/simple/test-child-process-spawn-node.js → test/simple/test-child-process-fork.js

Loading…
Cancel
Save