Browse Source

More bindings, beginning tcp server code in js

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
469e2648e5
  1. 45
      src/node_buffer.cc
  2. 45
      src/node_buffer.h
  3. 457
      src/node_net2.cc
  4. 55
      tcp.js

45
src/node_buffer.cc

@ -1,14 +1,15 @@
#include <node_buffer.h>
#include <assert.h>
#include <stdlib.h> // malloc, free
#include <v8.h>
#include <node.h>
namespace node {
using namespace v8;
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define SLICE_ARGS(start_arg, end_arg) \
if (!start_arg->IsInt32() || !end_arg->IsInt32()) { \
return ThrowException(Exception::TypeError( \
@ -24,33 +25,11 @@ using namespace v8;
static Persistent<String> length_symbol;
static Persistent<FunctionTemplate> constructor_template;
/* A buffer is a chunk of memory stored outside the V8 heap, mirrored by an
* object in javascript. The object is not totally opaque, one can access
* individual bytes with [] and slice it into substrings or sub-buffers
* without copying memory.
*
* // return an ascii encoded string - no memory iscopied
* buffer.asciiSlide(0, 3)
*
* // returns another buffer - no memory is copied
* buffer.slice(0, 3)
*
* Interally, each javascript buffer object is backed by a "struct buffer"
* object. These "struct buffer" objects are either a root buffer (in the
* case that buffer->root == NULL) or slice objects (in which case
* buffer->root != NULL). A root buffer is only GCed once all its slices
* are GCed.
*/
struct buffer {
Persistent<Object> handle; // both
bool weak; // both
struct buffer *root; // both (NULL for root)
size_t offset; // both (0 for root)
size_t length; // both
unsigned int refs; // root only
char bytes[1]; // root only
};
bool IsBuffer(v8::Handle<v8::Value> val) {
if (!val->IsObject()) return false;
Local<Object> obj = val->ToObject();
return constructor_template->HasInstance(obj);
}
static inline struct buffer* buffer_root(buffer *buffer) {
@ -79,7 +58,7 @@ static inline void buffer_unref(struct buffer *buffer) {
}
static inline struct buffer* Unwrap(Handle<Value> val) {
struct buffer* BufferUnwrap(v8::Handle<v8::Value> val) {
assert(val->IsObject());
HandleScope scope;
Local<Object> obj = val->ToObject();
@ -123,7 +102,7 @@ static Handle<Value> Constructor(const Arguments &args) {
// slice slice
SLICE_ARGS(args[1], args[2])
struct buffer *parent = Unwrap(args[0]);
struct buffer *parent = BufferUnwrap(args[0]);
size_t start_abs = buffer_abs_off(parent, start);
size_t end_abs = buffer_abs_off(parent, end);
@ -230,7 +209,7 @@ static Handle<Value> AsciiSlice(const Arguments &args) {
SLICE_ARGS(args[0], args[1])
assert(args.This()->InternalFieldCount() == 1);
struct buffer *parent = Unwrap(args.This());
struct buffer *parent = BufferUnwrap(args.This());
size_t start_abs = buffer_abs_off(parent, start);
size_t end_abs = buffer_abs_off(parent, end);
@ -251,7 +230,7 @@ static Handle<Value> Utf8Slice(const Arguments &args) {
SLICE_ARGS(args[0], args[1])
struct buffer *parent = Unwrap(args.This());
struct buffer *parent = BufferUnwrap(args.This());
size_t start_abs = buffer_abs_off(parent, start);
size_t end_abs = buffer_abs_off(parent, end);
assert(start_abs <= end_abs);

45
src/node_buffer.h

@ -5,8 +5,53 @@
namespace node {
#define MIN(a,b) ((a) < (b) ? (a) : (b))
/* A buffer is a chunk of memory stored outside the V8 heap, mirrored by an
* object in javascript. The object is not totally opaque, one can access
* individual bytes with [] and slice it into substrings or sub-buffers
* without copying memory.
*
* // return an ascii encoded string - no memory iscopied
* buffer.asciiSlide(0, 3)
*
* // returns another buffer - no memory is copied
* buffer.slice(0, 3)
*
* Interally, each javascript buffer object is backed by a "struct buffer"
* object. These "struct buffer" objects are either a root buffer (in the
* case that buffer->root == NULL) or slice objects (in which case
* buffer->root != NULL). A root buffer is only GCed once all its slices
* are GCed.
*/
struct buffer {
v8::Persistent<v8::Object> handle; // both
bool weak; // both
struct buffer *root; // both (NULL for root)
size_t offset; // both (0 for root)
size_t length; // both
unsigned int refs; // root only
char bytes[1]; // root only
};
void InitBuffer(v8::Handle<v8::Object> target);
struct buffer* BufferUnwrap(v8::Handle<v8::Value> val);
bool IsBuffer(v8::Handle<v8::Value> val);
static inline char * buffer_p(struct buffer *buffer, size_t off) {
struct buffer *root = buffer->root ? buffer->root : buffer;
if (buffer->offset + off >= root->length) return NULL;
return reinterpret_cast<char*>(&(root->bytes) + buffer->offset + off);
}
static inline size_t buffer_remaining(struct buffer *buffer, size_t off) {
struct buffer *root = buffer->root ? buffer->root : buffer;
char *end = reinterpret_cast<char*>(&(root->bytes) + root->length);
return end - buffer_p(buffer, off);
}
}
#endif // NODE_BUFFER

457
src/node_net2.cc

@ -2,6 +2,7 @@
#include <v8.h>
#include <node.h>
#include <node_buffer.h>
#include <string.h>
@ -14,8 +15,6 @@
#include <errno.h>
namespace node {
using namespace v8;
@ -23,7 +22,20 @@ using namespace v8;
static Persistent<String> errno_symbol;
static Persistent<String> syscall_symbol;
static inline Local<Value> ErrnoException(int errorno, const char *syscall, const char *msg = "") {
static Persistent<String> fd_symbol;
static Persistent<String> remote_address_symbol;
static Persistent<String> remote_port_symbol;
#define FD_ARG(a) \
if (!(a)->IsInt32()) { \
return ThrowException(Exception::TypeError( \
String::New("Bad file descriptor argument"))); \
} \
int fd = (a)->Int32Value();
static inline Local<Value> ErrnoException(int errorno,
const char *syscall,
const char *msg = "") {
if (!msg[0]) msg = strerror(errorno);
Local<Value> e = Exception::Error(String::NewSymbol(msg));
Local<Object> obj = e->ToObject();
@ -32,6 +44,57 @@ static inline Local<Value> ErrnoException(int errorno, const char *syscall, cons
return e;
}
static inline bool SetNonBlock(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return false;
flags |= O_NONBLOCK;
return (fcntl(fd, F_SETFL, flags) != -1);
}
// Creates nonblocking pipe
static Handle<Value> Pipe(const Arguments& args) {
HandleScope scope;
int fds[2];
if (pipe(fds) < 0) return ThrowException(ErrnoException(errno, "pipe"));
if(!SetNonBlock(fds[0]) || !SetNonBlock(fds[1])) {
int fcntl_errno = errno;
close(fds[0]);
close(fds[1]);
return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
}
Local<Array> a = Array::New(2);
a->Set(Integer::New(0), Integer::New(fds[0]));
a->Set(Integer::New(1), Integer::New(fds[1]));
return scope.Close(a);
}
// Creates nonblocking socket pair
static Handle<Value> SocketPair(const Arguments& args) {
HandleScope scope;
int fds[2];
// XXX support SOCK_DGRAM?
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
return ThrowException(ErrnoException(errno, "socketpair"));
}
if (!SetNonBlock(fds[0]) || !SetNonBlock(fds[1])) {
int fcntl_errno = errno;
close(fds[0]);
close(fds[1]);
return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
}
Local<Array> a = Array::New(2);
a->Set(Integer::New(0), Integer::New(fds[0]));
a->Set(Integer::New(1), Integer::New(fds[1]));
return scope.Close(a);
}
// Creates a new non-blocking socket fd
// t.socket("TCP");
// t.socket("UNIX");
@ -64,19 +127,8 @@ static Handle<Value> Socket(const Arguments& args) {
if (fd < 0) return ThrowException(ErrnoException(errno, "socket"));
int fcntl_errno;
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
fcntl_errno = errno;
close(fd);
return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
}
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1) {
fcntl_errno = errno;
if (!SetNonBlock(fd)) {
int fcntl_errno = errno;
close(fd);
return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
}
@ -84,39 +136,26 @@ static Handle<Value> Socket(const Arguments& args) {
return scope.Close(Integer::New(fd));
}
// 2 arguments means connect with unix
// t.connect(fd, "/tmp/socket")
//
// 3 arguments means connect with TCP or UDP
// t.connect(fd, "127.0.0.1", 80)
static Handle<Value> Connect(const Arguments& args) {
HandleScope scope;
if (!args[0]->IsInt32()) {
return ThrowException(Exception::TypeError(
String::New("First argument should be file descriptor")));
}
if (args.Length() < 2) {
return ThrowException(Exception::TypeError(
String::New("Must have at least two args")));
}
int fd = args[0]->Int32Value();
struct sockaddr *addr;
socklen_t addrlen;
if (args.Length() == 2) {
// NOT AT ALL THREAD SAFE - but that's okay for node.js
// (yes this is all to avoid one small heap alloc)
static struct sockaddr *addr;
static socklen_t addrlen;
static struct sockaddr_un un;
static struct sockaddr_in6 in6;
static inline Handle<Value> ParseAddressArgs(Handle<Value> first,
Handle<Value> second,
struct in6_addr default_addr
) {
if (first->IsString() && second->IsUndefined()) {
// UNIX
String::Utf8Value path(args[1]->ToString());
struct sockaddr_un un = {0};
String::Utf8Value path(first->ToString());
if (path.length() > sizeof un.sun_path) {
return ThrowException(Exception::Error(String::New("Socket path too long")));
return Exception::Error(String::New("Socket path too long"));
}
memset(&un, 0, sizeof un);
un.sun_family = AF_UNIX;
strcpy(un.sun_path, *path);
@ -125,27 +164,29 @@ static Handle<Value> Connect(const Arguments& args) {
} else {
// TCP or UDP
String::Utf8Value ip(args[1]->ToString());
int port = args[2]->Int32Value();
struct sockaddr_in6 in6 = {0};
char ipv6[255] = "::FFFF:";
if (inet_pton(AF_INET, *ip, &(in6.sin6_addr)) > 0) {
// If this is an IPv4 address then we need to change it to the
// IPv4-mapped-on-IPv6 format which looks like
// ::FFFF:<IPv4 address>
// For more information see "Address Format" ipv6(7) and "BUGS" in
// inet_pton(3)
strcat(ipv6, *ip);
int port = first->Int32Value();
memset(&in6, 0, sizeof in6);
if (!second->IsString()) {
in6.sin6_addr = default_addr;
} else {
strcpy(ipv6, *ip);
}
if (inet_pton(AF_INET6, ipv6, &(in6.sin6_addr)) <= 0) {
return ThrowException(
ErrnoException(errno, "inet_pton", "Invalid IP Address"));
String::Utf8Value ip(second->ToString());
char ipv6[255] = "::FFFF:";
if (inet_pton(AF_INET, *ip, &(in6.sin6_addr)) > 0) {
// If this is an IPv4 address then we need to change it
// to the IPv4-mapped-on-IPv6 format which looks like
// ::FFFF:<IPv4 address>
// For more information see "Address Format" ipv6(7) and
// "BUGS" in inet_pton(3)
strcat(ipv6, *ip);
} else {
strcpy(ipv6, *ip);
}
if (inet_pton(AF_INET6, ipv6, &(in6.sin6_addr)) <= 0) {
return ErrnoException(errno, "inet_pton", "Invalid IP Address");
}
}
in6.sin6_family = AF_INET6;
@ -154,6 +195,106 @@ static Handle<Value> Connect(const Arguments& args) {
addr = (struct sockaddr*)&in6;
addrlen = sizeof in6;
}
return Handle<Value>();
}
// Bind with UNIX
// t.bind(fd, "/tmp/socket")
// Bind with TCP
// t.bind(fd, 80, "192.168.11.2")
// t.bind(fd, 80)
static Handle<Value> Bind(const Arguments& args) {
HandleScope scope;
if (args.Length() < 2) {
return ThrowException(Exception::TypeError(
String::New("Must have at least two args")));
}
FD_ARG(args[0])
Handle<Value> error = ParseAddressArgs(args[1], args[2], in6addr_any);
if (!error.IsEmpty()) return ThrowException(error);
int flags = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
int r = bind(fd, addr, addrlen);
if (r < 0) {
return ThrowException(ErrnoException(errno, "bind"));
}
return Undefined();
}
static Handle<Value> Close(const Arguments& args) {
HandleScope scope;
FD_ARG(args[0])
if (0 > close(fd)) {
return ThrowException(ErrnoException(errno, "close"));
}
return Undefined();
}
// t.shutdown(fd, "read"); -- SHUT_RD
// t.shutdown(fd, "write"); -- SHUT_WR
// t.shutdown(fd, "readwrite"); -- SHUT_RDWR
// second arg defaults to "write".
static Handle<Value> Shutdown(const Arguments& args) {
HandleScope scope;
FD_ARG(args[0])
int how = SHUT_WR;
if (args[1]->IsString()) {
String::Utf8Value t(args[0]->ToString());
if (0 == strcasecmp(*t, "write")) {
how = SHUT_WR;
} else if (0 == strcasecmp(*t, "read")) {
how = SHUT_RD;
} else if (0 == strcasecmp(*t, "readwrite")) {
how = SHUT_RDWR;
} else {
return ThrowException(Exception::Error(String::New(
"Unknown shutdown method. (Use 'read', 'write', or 'readwrite'.)")));
}
}
if (0 > shutdown(fd, how)) {
return ThrowException(ErrnoException(errno, "shutdown"));
}
return Undefined();
}
// Connect with unix
// t.connect(fd, "/tmp/socket")
//
// Connect with TCP or UDP
// t.connect(fd, 80, "192.168.11.2")
// t.connect(fd, 80, "::1")
// t.connect(fd, 80)
// the third argument defaults to "::1"
static Handle<Value> Connect(const Arguments& args) {
HandleScope scope;
if (args.Length() < 2) {
return ThrowException(Exception::TypeError(
String::New("Must have at least two args")));
}
FD_ARG(args[0])
Handle<Value> error = ParseAddressArgs(args[1], args[2], in6addr_loopback);
if (!error.IsEmpty()) return ThrowException(error);
int r = connect(fd, addr, addrlen);
@ -165,14 +306,204 @@ static Handle<Value> Connect(const Arguments& args) {
}
static Handle<Value> Listen(const Arguments& args) {
HandleScope scope;
FD_ARG(args[0])
int backlog = args[1]->IsInt32() ? args[1]->Int32Value() : 128;
if (0 > listen(fd, backlog)) {
return ThrowException(ErrnoException(errno, "listen"));
}
return Undefined();
}
// var peerInfo = t.accept(server_fd);
//
// peerInfo.fd
// peerInfo.remoteAddress
// peerInfo.remotePort
//
// Returns a new nonblocking socket fd. If the listen queue is empty the
// function returns null (wait for server_fd to become readable and try
// again)
static Handle<Value> Accept(const Arguments& args) {
HandleScope scope;
FD_ARG(args[0])
struct sockaddr_storage addr;
socklen_t len;
int peer = accept(fd, (struct sockaddr*) &addr, &len);
if (peer < 0) {
if (errno == EAGAIN) return Null();
return ThrowException(ErrnoException(errno, "accept"));
}
if (!SetNonBlock(peer)) {
int fcntl_errno = errno;
close(peer);
return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
}
Local<Object> peer_info = Object::New();
peer_info->Set(fd_symbol, Integer::New(fd));
if (addr.ss_family == AF_INET6) {
struct sockaddr_in6 *a = reinterpret_cast<struct sockaddr_in6*>(&addr);
char ip[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, &a->sin6_addr, ip, INET6_ADDRSTRLEN);
int port = ntohs(a->sin6_port);
peer_info->Set(remote_address_symbol, String::New(ip));
peer_info->Set(remote_port_symbol, Integer::New(port));
}
return scope.Close(peer_info);
}
static Handle<Value> GetSocketError(const Arguments& args) {
HandleScope scope;
FD_ARG(args[0])
int error;
socklen_t len = sizeof(int);
int r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
if (r < 0) {
return ThrowException(ErrnoException(errno, "getsockopt"));
}
return scope.Close(Integer::New(error));
}
// var bytesRead = t.read(fd, buffer, offset, length);
// returns null on EAGAIN or EINTR, raises an exception on all other errors
// returns 0 on EOF.
static Handle<Value> Read(const Arguments& args) {
HandleScope scope;
if (args.Length() < 4) {
return ThrowException(Exception::TypeError(
String::New("Takes 4 parameters")));
}
FD_ARG(args[0])
if (!IsBuffer(args[1])) {
return ThrowException(Exception::TypeError(
String::New("Second argument should be a buffer")));
}
struct buffer * buffer = BufferUnwrap(args[1]);
size_t off = args[2]->Int32Value();
if (buffer_p(buffer, off) == NULL) {
return ThrowException(Exception::Error(
String::New("Offset is out of bounds")));
}
size_t len = args[3]->Int32Value();
if (buffer_remaining(buffer, off) < len) {
return ThrowException(Exception::Error(
String::New("Length is extends beyond buffer")));
}
size_t bytes_read = read(fd,
buffer_p(buffer, off),
buffer_remaining(buffer, off));
if (bytes_read < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
return ThrowException(ErrnoException(errno, "read"));
}
return Integer::New(bytes_read);
}
// var bytesWritten = t.write(fd, buffer, offset, length);
// returns null on EAGAIN or EINTR, raises an exception on all other errors
static Handle<Value> Write(const Arguments& args) {
HandleScope scope;
if (args.Length() < 4) {
return ThrowException(Exception::TypeError(
String::New("Takes 4 parameters")));
}
FD_ARG(args[0])
if (!IsBuffer(args[1])) {
return ThrowException(Exception::TypeError(
String::New("Second argument should be a buffer")));
}
struct buffer * buffer = BufferUnwrap(args[1]);
size_t off = args[2]->Int32Value();
if (buffer_p(buffer, off) == NULL) {
return ThrowException(Exception::Error(
String::New("Offset is out of bounds")));
}
size_t len = args[3]->Int32Value();
if (buffer_remaining(buffer, off) < len) {
return ThrowException(Exception::Error(
String::New("Length is extends beyond buffer")));
}
size_t written = write(fd,
buffer_p(buffer, off),
buffer_remaining(buffer, off));
if (written < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
return ThrowException(ErrnoException(errno, "write"));
}
return Integer::New(written);
}
void InitNet2(Handle<Object> target) {
HandleScope scope;
NODE_SET_METHOD(target, "write", Write);
NODE_SET_METHOD(target, "read", Read);
NODE_SET_METHOD(target, "socket", Socket);
NODE_SET_METHOD(target, "close", Close);
NODE_SET_METHOD(target, "shutdown", Shutdown);
NODE_SET_METHOD(target, "pipe", Pipe);
NODE_SET_METHOD(target, "socketpair", SocketPair);
NODE_SET_METHOD(target, "connect", Connect);
NODE_SET_METHOD(target, "bind", Bind);
NODE_SET_METHOD(target, "listen", Listen);
NODE_SET_METHOD(target, "accept", Accept);
NODE_SET_METHOD(target, "getSocketError", GetSocketError);
target->Set(String::NewSymbol("EINPROGRESS"), Integer::New(EINPROGRESS));
target->Set(String::NewSymbol("EINTR"), Integer::New(EINTR));
target->Set(String::NewSymbol("EACCES"), Integer::New(EACCES));
target->Set(String::NewSymbol("EPERM"), Integer::New(EPERM));
target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE));
target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED));
errno_symbol = NODE_PSYMBOL("errno");
syscall_symbol = NODE_PSYMBOL("syscall");
fd_symbol = NODE_PSYMBOL("fd");
remote_address_symbol = NODE_PSYMBOL("remoteAddress");
remote_port_symbol = NODE_PSYMBOL("remotePort");
}
} // namespace node

55
tcp.js

@ -0,0 +1,55 @@
var socket = process.socket;
var bind = process.bind;
var listen = process.listen;
var accept = process.accept;
var close = process.close;
var Server = function (listener) {
var self = this;
if (listener) {
self.addListener("connection", listener);
}
};
process.inherits(Server, process.EventEmitter);
Server.prototype.listen = function (port, host) {
var self = this;
if (self.fd) throw new Error("Already running");
self.fd = process.socket("TCP");
// TODO dns resolution
bind(self.fd, port, host);
listen(self.fd, 128); // TODO configurable backlog
self.watcher = new process.IOWatcher(self.fd, true, false, function () {
var peerInfo;
while (self.fd) {
peerInfo = accept(self.fd);
if (peerInfo === null) return;
self.emit("connection", peerInfo);
}
});
self.watcher.start();
};
Server.prototype.close = function () {
var self = this;
if (!self.fd) throw new Error("Not running");
self.watcher.stop();
close(self.fd);
this.watcher = null;
this.fd = null;
};
///////////////////////////////////////////////////////
var sys = require("sys");
var server = new Server(function () {
sys.puts("connection");
server.close();
});
server.listen(8000);
Loading…
Cancel
Save