Browse Source

Reference counting. Network bugs.

Connections were being garbage collected while they were still in progress
since the object would leave scope. This commit adds ObjectWrap::Attach()
and ObjectWrap::Detach() to tell v8 that an object is currently on the event
loop and will be needed in the future.

Other changes to oi_socket.c and net.cc are to fix bugs encountered while
running the HTTP server.
v0.7.4-release
Ryan 16 years ago
parent
commit
cfd61622ae
  1. 58
      deps/liboi/oi_socket.c
  2. 32
      src/file.cc
  3. 16
      src/net.cc
  4. 5
      src/net.h
  5. 33
      src/node.cc
  6. 16
      src/node.h
  7. 15
      src/timer.cc

58
deps/liboi/oi_socket.c

@ -47,13 +47,13 @@
#if HAVE_GNUTLS #if HAVE_GNUTLS
# include <gnutls/gnutls.h> # include <gnutls/gnutls.h>
#endif // HAVE_GNUTLS
/* a few forwards /* a few forwards
* they wont even be defined if not having gnutls * they wont even be defined if not having gnutls
* */ * */
static int secure_full_goodbye (oi_socket *socket); static int secure_full_goodbye (oi_socket *socket);
static int secure_half_goodbye (oi_socket *socket); static int secure_half_goodbye (oi_socket *socket);
#endif // HAVE_GNUTLS
#undef TRUE #undef TRUE
#define TRUE 1 #define TRUE 1
@ -99,23 +99,21 @@ oi_buf_new (const char *base, size_t len)
return buf; return buf;
} }
#define CLOSE_ASAP(socket) do { \ #define CLOSE_ASAP(socket) do { \
if ((socket)->read_action) { \ (socket)->read_action = full_close; \
(socket)->read_action = full_close; \ (socket)->write_action = full_close; \
} \
if ((socket)->write_action) { \
(socket)->write_action = full_close; \
} \
} while (0) } while (0)
static int static int
full_close(oi_socket *socket) full_close(oi_socket *socket)
{ {
//printf("close(%d)\n", socket->fd);
if (close(socket->fd) == -1) if (close(socket->fd) == -1)
return errno == EINTR ? AGAIN : ERROR; return errno == EINTR ? AGAIN : ERROR;
socket->read_action = NULL; socket->read_action = NULL;
socket->write_action = NULL; socket->write_action = NULL;
socket->fd = -1;
return OKAY; return OKAY;
} }
@ -124,9 +122,17 @@ half_close(oi_socket *socket)
{ {
int r = shutdown(socket->fd, SHUT_WR); int r = shutdown(socket->fd, SHUT_WR);
if (r == -1) { if (r == -1) {
socket->errorno = errno; switch (errno) {
assert(0 && "Shouldn't get an error on shutdown"); case ENOTCONN:
return ERROR; socket->errorno = errno;
return ERROR;
default:
perror("shutdown()");
socket->errorno = errno;
assert(0 && "Shouldn't get an error on shutdown");
return ERROR;
}
} }
socket->write_action = NULL; socket->write_action = NULL;
return OKAY; return OKAY;
@ -141,6 +147,9 @@ change_state_for_empty_out_stream (oi_socket *socket)
* a very complicated bunch of close logic! * a very complicated bunch of close logic!
* XXX this is awful. FIXME * XXX this is awful. FIXME
*/ */
if (socket->write_action == full_close || socket->read_action == full_close)
return;
if (socket->got_half_close == FALSE) { if (socket->got_half_close == FALSE) {
if (socket->got_full_close == FALSE) { if (socket->got_full_close == FALSE) {
/* Normal situation. Didn't get any close signals. */ /* Normal situation. Didn't get any close signals. */
@ -244,10 +253,10 @@ secure_handshake(oi_socket *socket)
if (socket->on_connect) socket->on_connect(socket); if (socket->on_connect) socket->on_connect(socket);
} }
if (socket->read_action) if (socket->read_action == secure_handshake)
socket->read_action = secure_socket_recv; socket->read_action = secure_socket_recv;
if (socket->write_action) if (socket->write_action == secure_handshake)
socket->write_action = secure_socket_send; socket->write_action = secure_socket_send;
return OKAY; return OKAY;
@ -340,8 +349,11 @@ secure_socket_recv(oi_socket *socket)
if (recved >= 0) { if (recved >= 0) {
/* Got EOF */ /* Got EOF */
if (recved == 0) if (recved == 0) {
socket->read_action = NULL; socket->read_action = NULL;
if (socket->write_action == NULL)
CLOSE_ASAP(socket);
}
if (socket->write_action) if (socket->write_action)
socket->write_action = secure_socket_send; socket->write_action = secure_socket_send;
@ -454,6 +466,7 @@ socket_send (oi_socket *socket)
default: default:
perror("send()"); perror("send()");
printf("%p had send error\n", socket);
assert(0 && "oi shouldn't let this happen."); assert(0 && "oi shouldn't let this happen.");
socket->errorno = errno; socket->errorno = errno;
return ERROR; return ERROR;
@ -509,7 +522,7 @@ socket_recv (oi_socket *socket)
default: default:
perror("recv()"); perror("recv()");
printf("unmatched errno %d\n", errno); printf("unmatched errno %d %s\n\n", errno, strerror(errno));
assert(0 && "recv returned error that oi should have caught before."); assert(0 && "recv returned error that oi should have caught before.");
return ERROR; return ERROR;
} }
@ -520,6 +533,8 @@ socket_recv (oi_socket *socket)
if (recved == 0) { if (recved == 0) {
oi_socket_read_stop(socket); oi_socket_read_stop(socket);
socket->read_action = NULL; socket->read_action = NULL;
if (socket->write_action == NULL)
CLOSE_ASAP(socket);
} }
/* NOTE: EOF is signaled with recved == 0 on callback */ /* NOTE: EOF is signaled with recved == 0 on callback */
@ -860,8 +875,19 @@ oi_socket_full_close (oi_socket *socket)
void oi_socket_force_close (oi_socket *socket) void oi_socket_force_close (oi_socket *socket)
{ {
release_write_buffer(socket);
ev_clear_pending (EV_A_ &socket->write_watcher);
ev_clear_pending (EV_A_ &socket->read_watcher);
ev_clear_pending (EV_A_ &socket->timeout_watcher);
socket->write_action = socket->read_action = NULL;
// socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE // socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE
CLOSE_ASAP(socket); close(socket->fd);
socket->fd = -1;
oi_socket_detach(socket);
} }
void void

32
src/file.cc

@ -105,7 +105,7 @@ FileSystem::Rename (const Arguments& args)
String::Utf8Value new_path(args[1]->ToString()); String::Utf8Value new_path(args[1]->ToString());
node::eio_warmup(); node::eio_warmup();
eio_req *req = eio_rename(*path, *new_path, EIO_PRI_DEFAULT, AfterRename, NULL); eio_rename(*path, *new_path, EIO_PRI_DEFAULT, AfterRename, NULL);
return Undefined(); return Undefined();
} }
@ -132,7 +132,7 @@ FileSystem::Stat (const Arguments& args)
String::Utf8Value path(args[0]->ToString()); String::Utf8Value path(args[0]->ToString());
node::eio_warmup(); node::eio_warmup();
eio_req *req = eio_stat(*path, EIO_PRI_DEFAULT, AfterStat, NULL); eio_stat(*path, EIO_PRI_DEFAULT, AfterStat, NULL);
return Undefined(); return Undefined();
} }
@ -225,6 +225,7 @@ File::GetFD (void)
{ {
Handle<Value> fd_value = handle_->Get(FD_SYMBOL); Handle<Value> fd_value = handle_->Get(FD_SYMBOL);
int fd = fd_value->IntegerValue(); int fd = fd_value->IntegerValue();
return fd;
} }
Handle<Value> Handle<Value>
@ -237,8 +238,8 @@ File::Close (const Arguments& args)
int fd = file->GetFD(); int fd = file->GetFD();
node::eio_warmup(); node::eio_warmup();
eio_req *req = eio_close (fd, EIO_PRI_DEFAULT, File::AfterClose, file); eio_close (fd, EIO_PRI_DEFAULT, File::AfterClose, file);
file->Attach();
return Undefined(); return Undefined();
} }
@ -255,7 +256,7 @@ File::AfterClose (eio_req *req)
Local<Value> argv[argc]; Local<Value> argv[argc];
argv[0] = Integer::New(req->errorno); argv[0] = Integer::New(req->errorno);
CallTopCallback(file->handle_, argc, argv); CallTopCallback(file->handle_, argc, argv);
file->Detach();
return 0; return 0;
} }
@ -298,8 +299,8 @@ File::Open (const Arguments& args)
// TODO how should the mode be set? // TODO how should the mode be set?
node::eio_warmup(); node::eio_warmup();
eio_req *req = eio_open (*path, flags, 0666, EIO_PRI_DEFAULT, File::AfterOpen, file); eio_open (*path, flags, 0666, EIO_PRI_DEFAULT, File::AfterOpen, file);
file->Attach();
return Undefined(); return Undefined();
} }
@ -318,6 +319,7 @@ File::AfterOpen (eio_req *req)
argv[0] = Integer::New(req->errorno); argv[0] = Integer::New(req->errorno);
CallTopCallback(file->handle_, argc, argv); CallTopCallback(file->handle_, argc, argv);
file->Detach();
return 0; return 0;
} }
@ -346,7 +348,7 @@ File::Write (const Arguments& args)
Local<Array> array = Local<Array>::Cast(args[0]); Local<Array> array = Local<Array>::Cast(args[0]);
length = array->Length(); length = array->Length();
buf = static_cast<char*>(malloc(length)); buf = static_cast<char*>(malloc(length));
for (int i = 0; i < length; i++) { for (unsigned int i = 0; i < length; i++) {
Local<Value> int_value = array->Get(Integer::New(i)); Local<Value> int_value = array->Get(Integer::New(i));
buf[i] = int_value->Int32Value(); buf[i] = int_value->Int32Value();
} }
@ -366,8 +368,9 @@ File::Write (const Arguments& args)
int fd = file->GetFD(); int fd = file->GetFD();
node::eio_warmup(); node::eio_warmup();
eio_req *req = eio_write(fd, buf, length, pos, EIO_PRI_DEFAULT, File::AfterWrite, file); eio_write(fd, buf, length, pos, EIO_PRI_DEFAULT, File::AfterWrite, file);
file->Attach();
return Undefined(); return Undefined();
} }
@ -378,7 +381,7 @@ File::AfterWrite (eio_req *req)
//char *buf = static_cast<char*>(req->ptr2); //char *buf = static_cast<char*>(req->ptr2);
free(req->ptr2); free(req->ptr2);
size_t written = req->result; ssize_t written = req->result;
HandleScope scope; HandleScope scope;
@ -388,6 +391,7 @@ File::AfterWrite (eio_req *req)
argv[1] = written >= 0 ? Integer::New(written) : Integer::New(0); argv[1] = written >= 0 ? Integer::New(written) : Integer::New(0);
CallTopCallback(file->handle_, argc, argv); CallTopCallback(file->handle_, argc, argv);
file->Detach();
return 0; return 0;
} }
@ -407,9 +411,9 @@ File::Read (const Arguments& args)
// NOTE: NULL pointer tells eio to allocate it itself // NOTE: NULL pointer tells eio to allocate it itself
node::eio_warmup(); node::eio_warmup();
eio_req *req = eio_read(fd, NULL, length, pos, EIO_PRI_DEFAULT, File::AfterRead, file); eio_read(fd, NULL, length, pos, EIO_PRI_DEFAULT, File::AfterRead, file);
assert(req);
file->Attach();
return Undefined(); return Undefined();
} }
@ -436,13 +440,15 @@ File::AfterRead (eio_req *req)
} else { } else {
// raw encoding // raw encoding
Local<Array> array = Array::New(length); Local<Array> array = Array::New(length);
for (int i = 0; i < length; i++) { for (unsigned int i = 0; i < length; i++) {
array->Set(Integer::New(i), Integer::New(buf[i])); array->Set(Integer::New(i), Integer::New(buf[i]));
} }
argv[1] = array; argv[1] = array;
} }
} }
CallTopCallback(file->handle_, argc, argv); CallTopCallback(file->handle_, argc, argv);
file->Detach();
return 0; return 0;
} }

16
src/net.cc

@ -105,8 +105,13 @@ Connection::Connection (Handle<Object> handle, Handle<Function> protocol_class)
Connection::~Connection () Connection::~Connection ()
{ {
handle_->Delete(SEND_SYMBOL); static int i = 0;
Close(); if(socket_.fd > 0) {
printf("garbage collecting open Connection : %d\n", i++);
printf(" socket->read_action: %p\n", socket_.read_action);
printf(" socket->write_action: %p\n", socket_.write_action);
}
ForceClose();
} }
Local<Object> Local<Object>
@ -128,6 +133,8 @@ Connection::SetAcceptor (Handle<Object> acceptor_handle)
{ {
HandleScope scope; HandleScope scope;
handle_->Set(SERVER_SYMBOL, acceptor_handle); handle_->Set(SERVER_SYMBOL, acceptor_handle);
Attach();
} }
Handle<Value> Handle<Value>
@ -172,6 +179,9 @@ Connection::v8Connect (const Arguments& args)
, Connection::AfterResolve , Connection::AfterResolve
, connection , connection
); );
connection->Attach();
return Undefined(); return Undefined();
} }
@ -214,6 +224,7 @@ Connection::AfterResolve (eio_req *req)
} }
connection->OnDisconnect(); connection->OnDisconnect();
connection->Detach();
return 0; return 0;
} }
@ -242,6 +253,7 @@ Connection::v8ForceClose (const Arguments& args)
HandleScope scope; HandleScope scope;
Connection *connection = NODE_UNWRAP(Connection, args.Holder()); Connection *connection = NODE_UNWRAP(Connection, args.Holder());
connection->ForceClose(); connection->ForceClose();
//connection->Detach();
return Undefined(); return Undefined();
} }

5
src/net.h

@ -69,6 +69,7 @@ private:
static void on_close (oi_socket *s) { static void on_close (oi_socket *s) {
Connection *connection = static_cast<Connection*> (s->data); Connection *connection = static_cast<Connection*> (s->data);
connection->OnDisconnect(); connection->OnDisconnect();
connection->Detach();
} }
static void on_timeout (oi_socket *s) { static void on_timeout (oi_socket *s) {
@ -98,7 +99,7 @@ protected:
Acceptor (v8::Handle<v8::Object> handle, Acceptor (v8::Handle<v8::Object> handle,
v8::Handle<v8::Function> protocol_class, v8::Handle<v8::Function> protocol_class,
v8::Handle<v8::Object> options); v8::Handle<v8::Object> options);
virtual ~Acceptor () { Close(); } virtual ~Acceptor () { Close(); puts("acceptor gc'd!");}
v8::Local<v8::Function> GetProtocolClass (void); v8::Local<v8::Function> GetProtocolClass (void);
@ -106,11 +107,13 @@ protected:
int r = oi_server_listen (&server_, address); int r = oi_server_listen (&server_, address);
if(r != 0) return r; if(r != 0) return r;
oi_server_attach (EV_DEFAULT_ &server_); oi_server_attach (EV_DEFAULT_ &server_);
Attach();
return 0; return 0;
} }
void Close ( ) { void Close ( ) {
oi_server_close (&server_); oi_server_close (&server_);
Detach();
} }
virtual Connection* OnConnection (struct sockaddr *addr, socklen_t len); virtual Connection* OnConnection (struct sockaddr *addr, socklen_t len);

33
src/node.cc

@ -22,11 +22,9 @@ static int exit_code = 0;
ObjectWrap::~ObjectWrap ( ) ObjectWrap::~ObjectWrap ( )
{ {
if (!handle_.IsEmpty()) { handle_->SetInternalField(0, Undefined());
handle_->SetInternalField(0, Undefined()); handle_.Dispose();
handle_.Dispose(); handle_.Clear();
handle_.Clear();
}
} }
ObjectWrap::ObjectWrap (Handle<Object> handle) ObjectWrap::ObjectWrap (Handle<Object> handle)
@ -38,6 +36,25 @@ ObjectWrap::ObjectWrap (Handle<Object> handle)
Handle<External> external = External::New(this); Handle<External> external = External::New(this);
handle_->SetInternalField(0, external); handle_->SetInternalField(0, external);
handle_.MakeWeak(this, ObjectWrap::MakeWeak); handle_.MakeWeak(this, ObjectWrap::MakeWeak);
attach_count_ = 0;
weak_ = false;
}
void
ObjectWrap::Attach ()
{
attach_count_ += 1;
}
void
ObjectWrap::Detach ()
{
attach_count_ -= 1;
assert(attach_count_ >= 0);
if(weak_ && attach_count_ == 0)
delete this;
} }
void* void*
@ -52,8 +69,10 @@ ObjectWrap::Unwrap (v8::Handle<v8::Object> handle)
void void
ObjectWrap::MakeWeak (Persistent<Value> _, void *data) ObjectWrap::MakeWeak (Persistent<Value> _, void *data)
{ {
ObjectWrap *w = static_cast<ObjectWrap*> (data); ObjectWrap *obj = static_cast<ObjectWrap*> (data);
delete w; obj->weak_ = true;
if (obj->attach_count_ == 0)
delete obj;
} }
// Extracts a C string from a V8 Utf8Value. // Extracts a C string from a V8 Utf8Value.

16
src/node.h

@ -26,8 +26,24 @@ protected:
static void* Unwrap (v8::Handle<v8::Object> handle); static void* Unwrap (v8::Handle<v8::Object> handle);
v8::Persistent<v8::Object> handle_; v8::Persistent<v8::Object> handle_;
/* Attach() marks the object as being attached to an event loop.
* Attached objects will not be garbage collected, even if
* all references are lost.
*/
void Attach();
/* Detach() marks an object as detached from the event loop. This is its
* default state. When an object with a "weak" reference changes from
* attached to detached state it will be freed. Be careful not to access
* the object after making this call as it might be gone!
* (A "weak reference" is v8 terminology for an object who only has a
* persistant handle.)
*/
void Detach();
private: private:
static void MakeWeak (v8::Persistent<v8::Value> _, void *data); static void MakeWeak (v8::Persistent<v8::Value> _, void *data);
int attach_count_;
bool weak_;
}; };
} // namespace node } // namespace node

15
src/timer.cc

@ -3,8 +3,9 @@
#include <assert.h> #include <assert.h>
using namespace v8; using namespace v8;
using namespace node;
class Timer : node::ObjectWrap { class Timer : ObjectWrap {
public: public:
Timer(Handle<Object> handle, Handle<Function> callback, ev_tstamp after, ev_tstamp repeat); Timer(Handle<Object> handle, Handle<Function> callback, ev_tstamp after, ev_tstamp repeat);
~Timer(); ~Timer();
@ -34,7 +35,14 @@ Timer::OnTimeout (EV_P_ ev_timer *watcher, int revents)
TryCatch try_catch; TryCatch try_catch;
callback->Call (timer->handle_, 0, NULL); callback->Call (timer->handle_, 0, NULL);
if (try_catch.HasCaught()) if (try_catch.HasCaught())
node::fatal_exception(try_catch); fatal_exception(try_catch);
/* XXX i'm a bit worried if this is the correct test?
* it's rather crutial for memory leaks the conditional here test to see
* if the watcher will make another callback.
*/
if (false == ev_is_active(&timer->watcher_))
timer->Detach();
} }
Timer::Timer (Handle<Object> handle, Handle<Function> callback, ev_tstamp after, ev_tstamp repeat) Timer::Timer (Handle<Object> handle, Handle<Function> callback, ev_tstamp after, ev_tstamp repeat)
@ -48,6 +56,7 @@ Timer::Timer (Handle<Object> handle, Handle<Function> callback, ev_tstamp after,
watcher_.data = this; watcher_.data = this;
ev_timer_start(EV_DEFAULT_UC_ &watcher_); ev_timer_start(EV_DEFAULT_UC_ &watcher_);
Attach();
} }
Timer::~Timer () Timer::~Timer ()
@ -78,6 +87,7 @@ Timer::Start (const Arguments& args)
{ {
Timer *timer = NODE_UNWRAP(Timer, args.Holder()); Timer *timer = NODE_UNWRAP(Timer, args.Holder());
ev_timer_start(EV_DEFAULT_UC_ &timer->watcher_); ev_timer_start(EV_DEFAULT_UC_ &timer->watcher_);
timer->Attach();
return Undefined(); return Undefined();
} }
@ -86,6 +96,7 @@ Timer::Stop (const Arguments& args)
{ {
Timer *timer = NODE_UNWRAP(Timer, args.Holder()); Timer *timer = NODE_UNWRAP(Timer, args.Holder());
ev_timer_stop(EV_DEFAULT_UC_ &timer->watcher_); ev_timer_stop(EV_DEFAULT_UC_ &timer->watcher_);
timer->Detach();
return Undefined(); return Undefined();
} }

Loading…
Cancel
Save