Browse Source

Sync evcom after refactor; fix binding issues

v0.7.4-release
Ryan 15 years ago
parent
commit
3b0408ec1c
  1. 987
      deps/evcom/evcom.c
  2. 181
      deps/evcom/evcom.h
  3. 42
      deps/evcom/test/test.c
  4. 6
      src/http.js
  5. 75
      src/net.cc
  6. 23
      src/net.h
  7. 17
      test/mjsunit/test-http-cat.js

987
deps/evcom/evcom.c

File diff suppressed because it is too large

181
deps/evcom/evcom.h

@ -42,10 +42,10 @@ extern "C" {
# include <gnutls/gnutls.h>
#endif
typedef struct evcom_queue evcom_queue;
typedef struct evcom_buf evcom_buf;
typedef struct evcom_server evcom_server;
typedef struct evcom_stream evcom_stream;
/* The maximum evcom_stream will try to read in one callback */
#ifndef EVCOM_CHUNKSIZE
# define EVCOM_CHUNKSIZE (8*1024)
#endif
/* flags for stream and server */
#define EVCOM_ATTACHED 0x0001
@ -54,8 +54,91 @@ typedef struct evcom_stream evcom_stream;
#define EVCOM_SECURE 0x0008
#define EVCOM_GOT_HALF_CLOSE 0x0010
#define EVCOM_GOT_FULL_CLOSE 0x0020
#define EVCOM_TOO_MANY_CONN 0x0040
#define EVCOM_READ_PAUSED 0x0080
#define EVCOM_PAUSED 0x0040
#define EVCOM_READABLE 0x0080
#define EVCOM_WRITABLE 0x0100
#define EVCOM_GOT_WRITE_EVENT 0x0200
enum evcom_stream_state { EVCOM_INITIALIZED
, EVCOM_CONNECTING
, EVCOM_CONNECTED_RW /* read write */
, EVCOM_CONNECTED_RO /* read only */
, EVCOM_CONNECTED_WO /* write only */
, EVCOM_CLOSING
, EVCOM_CLOSED
};
typedef struct evcom_queue {
struct evcom_queue *prev;
struct evcom_queue *next;
} evcom_queue;
typedef struct evcom_buf {
/* public */
char *base;
size_t len;
void (*release) (struct evcom_buf *); /* called when oi is done with the object */
void *data;
/* private */
size_t written;
evcom_queue queue;
} evcom_buf;
#if EV_MULTIPLICITY
# define EVCOM_LOOP struct ev_loop *loop;
#else
# define EVCOM_LOOP
#endif
#define EVCOM_DESCRIPTOR(type) \
unsigned int flags; /* private */ \
int (*action) (struct evcom_descriptor*); /* private */ \
int errorno; /* read-only */ \
int fd; /* read-only */ \
EVCOM_LOOP /* read-only */ \
void *data; /* public */ \
void (*on_close) (struct type*); /* public */
typedef struct evcom_descriptor {
EVCOM_DESCRIPTOR(evcom_descriptor)
} evcom_descriptor;
typedef struct evcom_server {
EVCOM_DESCRIPTOR(evcom_server)
/* PRIVATE */
ev_io watcher;
/* PUBLIC */
struct evcom_stream*
(*on_connection)(struct evcom_server *, struct sockaddr *remote_addr);
} evcom_server;
typedef struct evcom_stream {
EVCOM_DESCRIPTOR(evcom_stream)
/* PRIVATE */
ev_io write_watcher;
ev_io read_watcher;
ev_timer timeout_watcher;
#if EVCOM_HAVE_GNUTLS
gnutls_session_t session;
#endif
/* READ-ONLY */
struct evcom_server *server;
evcom_queue out;
#if EVCOM_HAVE_GNUTLS
int gnutls_errorno;
#endif
/* PUBLIC */
void (*on_connect) (struct evcom_stream *);
void (*on_read) (struct evcom_stream *, const void *buf, size_t count);
void (*on_drain) (struct evcom_stream *);
void (*on_timeout) (struct evcom_stream *);
} evcom_stream;
void evcom_server_init (evcom_server *);
int evcom_server_listen (evcom_server *, struct sockaddr *address, int backlog);
@ -97,7 +180,7 @@ void evcom_stream_full_close (evcom_stream *);
/* The most extreme measure.
* Will not wait for the write queue to complete.
*/
void evcom_stream_force_close (evcom_stream *);
void evcom_stream_force_close (evcom_stream *);
#if EVCOM_HAVE_GNUTLS
@ -110,92 +193,12 @@ void evcom_stream_force_close (evcom_stream *);
void evcom_stream_set_secure_session (evcom_stream *, gnutls_session_t);
#endif
enum evcom_stream_state evcom_stream_state (evcom_stream *stream);
evcom_buf * evcom_buf_new (const char* base, size_t len);
evcom_buf * evcom_buf_new2 (size_t len);
void evcom_buf_destroy (evcom_buf *);
struct evcom_queue {
evcom_queue *prev;
evcom_queue *next;
};
struct evcom_buf {
/* public */
char *base;
size_t len;
void (*release) (evcom_buf *); /* called when oi is done with the object */
void *data;
/* private */
size_t written;
evcom_queue queue;
};
struct evcom_server {
/* read only */
int fd;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
unsigned flags;
/* PRIVATE */
ev_io connection_watcher;
/* PUBLIC */
evcom_stream* (*on_connection) (evcom_server *, struct sockaddr *remote_addr);
/* Executed when a server is closed.
* If evcom_server_close() was called errorno will be 0.
* An libev error is indicated with errorno == 1
* Otherwise errorno is a stdlib errno from a system call, e.g. accept()
*/
void (*on_close) (evcom_server *, int errorno);
void *data;
};
struct evcom_stream {
/* read only */
int fd;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
evcom_server *server;
evcom_queue out_stream;
size_t written;
unsigned flags;
/* NULL = that end of the stream is closed. */
int (*read_action) (evcom_stream *);
int (*write_action) (evcom_stream *);
/* ERROR CODES. 0 = no error. Check on_close. */
int errorno;
#if EVCOM_HAVE_GNUTLS
int gnutls_errorno;
#endif
/* private */
ev_io write_watcher;
ev_io read_watcher;
ev_timer timeout_watcher;
#if EVCOM_HAVE_GNUTLS
gnutls_session_t session;
#endif
/* public */
size_t chunksize; /* the maximum chunk that on_read() will return */
void (*on_connect) (evcom_stream *);
void (*on_read) (evcom_stream *, const void *buf, size_t count);
void (*on_drain) (evcom_stream *);
void (*on_close) (evcom_stream *);
void (*on_timeout) (evcom_stream *);
void *data;
};
EV_INLINE void
evcom_queue_init (evcom_queue *q)
{

42
deps/evcom/test/test.c

@ -27,22 +27,26 @@ static int use_tls;
static int got_server_close;
static void
common_on_server_close (evcom_server *server, int errorno)
common_on_server_close (evcom_server *s)
{
assert(server);
assert(errorno == 0);
printf("server on_close\n");
assert(s == &server);
assert(s->errorno == 0);
got_server_close = 1;
evcom_server_detach(s);
}
static void
common_on_peer_close (evcom_stream *stream)
{
assert(EVCOM_CLOSED == evcom_stream_state(stream));
assert(stream->errorno == 0);
printf("server connection closed\n");
#if EVCOM_HAVE_GNUTLS
assert(stream->gnutls_errorno == 0);
if (use_tls) gnutls_deinit(stream->session);
#endif
evcom_stream_detach(stream);
free(stream);
}
@ -106,7 +110,7 @@ void anon_tls_client (evcom_stream *stream)
#define PING "PING"
#define PONG "PONG"
#define EXCHANGES 5000
#define EXCHANGES 500
#define PINGPONG_TIMEOUT 5.0
static int successful_ping_count;
@ -130,9 +134,11 @@ pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len)
static void
pingpong_on_client_close (evcom_stream *stream)
{
assert(EVCOM_CLOSED == evcom_stream_state(stream));
assert(stream);
printf("client connection closed\n");
evcom_server_close(&server);
evcom_stream_detach(stream);
}
static evcom_stream*
@ -147,6 +153,8 @@ pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr)
stream->on_close = common_on_peer_close;
stream->on_timeout = common_on_peer_timeout;
assert(EVCOM_INITIALIZED == evcom_stream_state(stream));
nconnections++;
#if EVCOM_HAVE_GNUTLS
@ -163,6 +171,7 @@ pingpong_on_client_connect (evcom_stream *stream)
{
printf("client connected. sending ping\n");
evcom_stream_write_simple(stream, PING, sizeof PING);
assert(EVCOM_CONNECTED_RW == evcom_stream_state(stream));
}
static void
@ -219,6 +228,8 @@ pingpong (struct sockaddr *address)
client.on_close = pingpong_on_client_close;
client.on_timeout = common_on_client_timeout;
assert(EVCOM_INITIALIZED == evcom_stream_state(&client));
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_client(&client);
#endif
@ -239,35 +250,28 @@ pingpong (struct sockaddr *address)
#define NCONN 100
#define CONNINT_TIMEOUT 1000.0
#define NCONN 50
#define CONNINT_TIMEOUT 10.0
static void
connint_on_peer_read(evcom_stream *stream, const void *base, size_t len)
send_bye_and_close(evcom_stream *stream, const void *base, size_t len)
{
assert(base);
assert(len == 0);
evcom_stream_write_simple(stream, "BYE", 3);
printf("server wrote bye\n");
}
static void
connint_on_peer_drain(evcom_stream *stream)
{
evcom_stream_close(stream);
}
static evcom_stream*
connint_on_server_connection(evcom_server *_server, struct sockaddr *addr)
connint_on_connection(evcom_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
assert(addr);
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream, CONNINT_TIMEOUT);
stream->on_read = connint_on_peer_read;
stream->on_drain = connint_on_peer_drain;
stream->on_read = send_bye_and_close;
stream->on_close = common_on_peer_close;
stream->on_timeout = common_on_peer_timeout;
@ -300,6 +304,8 @@ connint_on_client_close (evcom_stream *stream)
evcom_server_close(&server);
printf("closing server\n");
}
evcom_stream_detach(stream);
}
static void
@ -329,10 +335,9 @@ connint (struct sockaddr *address)
got_server_close = 0;
evcom_server_init(&server);
server.on_connection = connint_on_server_connection;
server.on_connection = connint_on_connection;
server.on_close = common_on_server_close;
evcom_server_listen(&server, address, 1000);
evcom_server_attach(EV_DEFAULT_ &server);
@ -443,6 +448,5 @@ main (void)
free_unix_address(unix_address);
#endif
return 0;
}

6
src/http.js

@ -423,8 +423,6 @@ node.http.createClient = function (port, host) {
};
client.addListener("connect", function () {
//node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState);
//node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'");
requests[0].flush();
});
@ -439,11 +437,11 @@ node.http.createClient = function (port, host) {
return;
}
//node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
// If there are more requests to handle, reconnect.
if (requests.length > 0 && client.readyState != "opening") {
//node.debug("HTTP CLIENT: reconnecting readyState = " + client.readyState);
node.debug("HTTP CLIENT: reconnecting readyState = " + client.readyState);
client.connect(port, host); // reconnect
}
});

75
src/net.cc

@ -29,6 +29,7 @@ using namespace node;
#define OPENING_SYMBOL String::NewSymbol("opening")
#define READ_ONLY_SYMBOL String::NewSymbol("readOnly")
#define WRITE_ONLY_SYMBOL String::NewSymbol("writeOnly")
#define CLOSING_SYMBOL String::NewSymbol("closing")
#define CLOSED_SYMBOL String::NewSymbol("closed")
static const struct addrinfo server_tcp_hints =
@ -84,12 +85,16 @@ Connection::ReadyStateGetter (Local<String> property, const AccessorInfo& info)
assert(property == READY_STATE_SYMBOL);
switch(connection->ReadyState()) {
case OPEN: return scope.Close(OPEN_SYMBOL);
case OPENING: return scope.Close(OPENING_SYMBOL);
case CLOSED: return scope.Close(CLOSED_SYMBOL);
case READ_ONLY: return scope.Close(READ_ONLY_SYMBOL);
case WRITE_ONLY: return scope.Close(WRITE_ONLY_SYMBOL);
if (connection->resolving_) return scope.Close(OPENING_SYMBOL);
switch (evcom_stream_state(&connection->stream_)) {
case EVCOM_INITIALIZED: return scope.Close(CLOSED_SYMBOL);
case EVCOM_CONNECTING: return scope.Close(OPENING_SYMBOL);
case EVCOM_CONNECTED_RW: return scope.Close(OPEN_SYMBOL);
case EVCOM_CONNECTED_RO: return scope.Close(READ_ONLY_SYMBOL);
case EVCOM_CONNECTED_WO: return scope.Close(WRITE_ONLY_SYMBOL);
case EVCOM_CLOSING: return scope.Close(CLOSING_SYMBOL);
case EVCOM_CLOSED: return scope.Close(CLOSED_SYMBOL);
}
assert(0 && "This shouldnt happen");
@ -99,12 +104,11 @@ Connection::ReadyStateGetter (Local<String> property, const AccessorInfo& info)
void
Connection::Init (void)
{
opening = false;
resolving_ = false;
double timeout = 60.0; // default
evcom_stream_init(&stream_, timeout);
stream_.on_connect = Connection::on_connect;
stream_.on_read = Connection::on_read;
stream_.on_drain = Connection::on_drain;
stream_.on_close = Connection::on_close;
stream_.on_timeout = Connection::on_timeout;
stream_.data = this;
@ -127,30 +131,6 @@ Connection::New (const Arguments& args)
return args.This();
}
enum Connection::readyState
Connection::ReadyState (void)
{
if (stream_.flags & EVCOM_GOT_FULL_CLOSE)
return CLOSED;
if (stream_.flags & EVCOM_GOT_HALF_CLOSE)
return (stream_.read_action == NULL ? CLOSED : READ_ONLY);
if (stream_.read_action && stream_.write_action)
return OPEN;
else if (stream_.write_action)
return WRITE_ONLY;
else if (stream_.read_action)
return READ_ONLY;
else if (opening)
return OPENING;
return CLOSED;
}
Handle<Value>
Connection::Connect (const Arguments& args)
{
@ -160,16 +140,16 @@ Connection::Connect (const Arguments& args)
HandleScope scope;
if (connection->ReadyState() != CLOSED) {
if (connection->ReadyState() == EVCOM_CLOSED) {
connection->Init(); // in case we're reusing the socket
assert(connection->ReadyState() == EVCOM_INITIALIZED);
}
if (connection->ReadyState() != EVCOM_INITIALIZED) {
return ThrowException(String::New("Socket is not in CLOSED state."));
} else {
// XXX ugly.
connection->Init(); // in case we're reusing the socket... ?
}
assert(connection->stream_.fd < 0);
assert(connection->stream_.read_action == NULL);
assert(connection->stream_.write_action == NULL);
if (args.Length() == 0)
return ThrowException(String::New("Must specify a port."));
@ -184,7 +164,7 @@ Connection::Connect (const Arguments& args)
connection->host_ = strdup(*host_sv);
}
connection->opening = true;
connection->resolving_ = true;
ev_ref(EV_DEFAULT_UC);
@ -220,7 +200,7 @@ Connection::Resolve (eio_req *req)
struct addrinfo *address = NULL;
assert(connection->attached_);
assert(connection->opening);
assert(connection->resolving_);
req->result = getaddrinfo(connection->host_, connection->port_,
&client_tcp_hints, &address);
@ -258,7 +238,7 @@ Connection::AfterResolve (eio_req *req)
Connection *connection = static_cast<Connection*> (req->data);
assert(connection->opening);
assert(connection->resolving_);
assert(connection->attached_);
struct addrinfo *address = NULL,
@ -266,7 +246,7 @@ Connection::AfterResolve (eio_req *req)
address = AddressDefaultToIPv4(address_list);
connection->opening = false;
connection->resolving_ = false;
int r = 0;
if (req->result == 0) r = connection->Connect(address->ai_addr);
@ -274,7 +254,7 @@ Connection::AfterResolve (eio_req *req)
if (address_list) freeaddrinfo(address_list);
// no error. return.
if (r == 0 && req->result == 0) {
if (req->result == 0) {
evcom_stream_attach (EV_DEFAULT_UC_ &connection->stream_);
goto out;
}
@ -285,7 +265,7 @@ Connection::AfterResolve (eio_req *req)
* The fact that I'm modifying a read-only variable here should be
* good evidence of this.
*/
connection->stream_.errorno = r | req->result;
connection->stream_.errorno = req->result;
connection->OnDisconnect();
@ -395,10 +375,12 @@ Connection::Send (const Arguments& args)
Connection *connection = ObjectWrap::Unwrap<Connection>(args.Holder());
assert(connection);
if ( connection->ReadyState() != OPEN
&& connection->ReadyState() != WRITE_ONLY
if ( connection->ReadyState() != EVCOM_CONNECTED_RW
&& connection->ReadyState() != EVCOM_CONNECTED_WO
)
{
return ThrowException(String::New("Socket is not open for writing"));
}
// XXX
// A lot of improvement can be made here. First of all we're allocating
@ -492,7 +474,6 @@ void name () \
}
DEFINE_SIMPLE_CALLBACK(Connection::OnConnect, "connect")
DEFINE_SIMPLE_CALLBACK(Connection::OnDrain, "drain")
DEFINE_SIMPLE_CALLBACK(Connection::OnTimeout, "timeout")
DEFINE_SIMPLE_CALLBACK(Connection::OnEOF, "eof")

23
src/net.h

@ -56,18 +56,19 @@ protected:
virtual void OnConnect (void);
virtual void OnReceive (const void *buf, size_t len);
virtual void OnDrain (void);
virtual void OnEOF (void);
virtual void OnDisconnect (void);
virtual void OnTimeout (void);
v8::Local<v8::Object> GetProtocol (void);
enum evcom_stream_state ReadyState ( ) {
return evcom_stream_state(&stream_);
}
enum encoding encoding_;
enum readyState { OPEN, OPENING, CLOSED, READ_ONLY, WRITE_ONLY };
bool opening;
enum readyState ReadyState (void);
bool resolving_;
private:
@ -86,17 +87,12 @@ private:
connection->OnReceive(buf, len);
}
static void on_drain (evcom_stream *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnDrain();
}
static void on_close (evcom_stream *s) {
Connection *connection = static_cast<Connection*> (s->data);
evcom_stream_detach(s);
assert(connection->stream_.fd < 0);
assert(connection->stream_.read_action == NULL);
assert(connection->stream_.write_action == NULL);
connection->OnDisconnect();
@ -171,9 +167,10 @@ private:
}
void OnClose (int errorno);
static void on_close (evcom_server *s, int errorno) {
static void on_close (evcom_server *s) {
Server *server = static_cast<Server*> (s->data);
server->OnClose(errorno);
evcom_server_detach(s);
server->OnClose(s->errorno);
server->Detach();
}

17
test/mjsunit/test-http-cat.js

@ -3,6 +3,7 @@ PORT = 8888;
var body = "exports.A = function() { return 'A';}";
var server = node.http.createServer(function (req, res) {
puts("got request");
res.sendHeader(200, [
["Content-Length", body.length],
["Content-Type", "text/plain"]
@ -17,16 +18,16 @@ var bad_server_got_error = false;
function onLoad () {
node.http.cat("http://localhost:"+PORT+"/", "utf8").addCallback(function (content) {
puts("got response");
got_good_server_content = true;
assertEquals(body, content);
server.close();
});
puts("got response");
got_good_server_content = true;
assertEquals(body, content);
server.close();
});
node.http.cat("http://localhost:12312/", "utf8").addErrback(function () {
puts("got error (this should happen)");
bad_server_got_error = true;
});
puts("got error (this should happen)");
bad_server_got_error = true;
});
}
function onExit () {

Loading…
Cancel
Save