Browse Source

Upgrade liboi, which is now called evnet.

v0.7.4-release
Ryan 16 years ago
parent
commit
c5ab0d5a80
  1. 464
      deps/evnet/evnet.c
  2. 232
      deps/evnet/evnet.h
  3. 102
      deps/evnet/test/echo.c
  4. 478
      deps/evnet/test/test.c
  5. 6
      deps/evnet/test/timeout.rb
  6. 216
      deps/liboi/oi_socket.h
  7. 106
      deps/liboi/test/common.c
  8. 154
      deps/liboi/test/connection_interruption.c
  9. 97
      deps/liboi/test/echo.c
  10. 164
      deps/liboi/test/ping_pong.c
  11. 16
      src/net.cc
  12. 42
      src/net.h
  13. 10
      src/node.cc
  14. 4
      src/node.h
  15. 28
      src/process.cc
  16. 6
      src/process.h
  17. 22
      wscript

464
deps/liboi/oi_socket.c → deps/evnet/evnet.c

@ -1,6 +1,6 @@
/* Copyright (c) 2008,2009 Ryan Dahl
*
* oi_queue comes from ngx_queue.h
* evnet_queue comes from ngx_queue.h
* Copyright (C) 2002-2009 Igor Sysoev
*
* Redistribution and use in source and binary forms, with or without
@ -27,9 +27,9 @@
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h> /* close() */
#include <fcntl.h> /* fcntl() */
#include <errno.h> /* for the default methods */
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h> /* memset */
#include <netinet/tcp.h> /* TCP_NODELAY */
@ -37,7 +37,7 @@
#include <sys/socket.h> /* shutdown */
#include <ev.h>
#include <oi_socket.h>
#include <evnet.h>
#if EV_MULTIPLICITY
# define SOCKET_LOOP_ socket->loop,
@ -47,15 +47,10 @@
# define SERVER_LOOP_
#endif // EV_MULTIPLICITY
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
/* a few forwards
* they wont even be defined if not having gnutls
* */
static int secure_full_goodbye (oi_socket *socket);
static int secure_half_goodbye (oi_socket *socket);
#endif // HAVE_GNUTLS
#if EVNET_HAVE_GNUTLS
static int secure_full_goodbye (evnet_socket *socket);
static int secure_half_goodbye (evnet_socket *socket);
#endif
#undef TRUE
#define TRUE 1
@ -68,36 +63,47 @@ static int secure_half_goodbye (oi_socket *socket);
#define AGAIN 1
#define ERROR 2
EV_INLINE int
set_nonblock (int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -1;
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (r == -1) return -1;
return 0;
}
void
oi_buf_destroy (oi_buf *buf)
evnet_buf_destroy (evnet_buf *buf)
{
free(buf->base);
free(buf);
}
oi_buf *
oi_buf_new2 (size_t len)
evnet_buf *
evnet_buf_new2 (size_t len)
{
oi_buf *buf = malloc(sizeof(oi_buf));
if(!buf)
return NULL;
evnet_buf *buf = malloc(sizeof(evnet_buf));
if (!buf) return NULL;
buf->base = malloc(len);
if (!buf->base) {
free(buf);
return NULL;
}
buf->len = len;
buf->release = oi_buf_destroy;
buf->release = evnet_buf_destroy;
return buf;
}
oi_buf *
oi_buf_new (const char *base, size_t len)
evnet_buf *
evnet_buf_new (const char *base, size_t len)
{
oi_buf *buf = oi_buf_new2(len);
if(!buf)
return NULL;
evnet_buf *buf = evnet_buf_new2(len);
if (!buf) return NULL;
memcpy(buf->base, base, len);
return buf;
}
@ -107,13 +113,13 @@ oi_buf_new (const char *base, size_t len)
} while (0)
static int
full_close(oi_socket *socket)
full_close (evnet_socket *socket)
{
//printf("close(%d)\n", socket->fd);
if (close(socket->fd) == -1) {
if (errno == EINTR)
if (errno == EINTR) {
return AGAIN;
else {
} else {
socket->errorno = errno;
return ERROR;
}
@ -122,11 +128,12 @@ full_close(oi_socket *socket)
socket->read_action = NULL;
socket->write_action = NULL;
socket->fd = -1;
return OKAY;
}
static int
half_close(oi_socket *socket)
half_close (evnet_socket *socket)
{
int r = shutdown(socket->fd, SHUT_WR);
if (r == -1) {
@ -151,14 +158,15 @@ half_close(oi_socket *socket)
// This is to be called when ever the out_stream is empty
// and we need to change state.
static void
change_state_for_empty_out_stream (oi_socket *socket)
change_state_for_empty_out_stream (evnet_socket *socket)
{
/*
* a very complicated bunch of close logic!
* XXX this is awful. FIXME
*/
if (socket->write_action == full_close || socket->read_action == full_close)
if (socket->write_action == full_close || socket->read_action == full_close) {
return;
}
if (socket->got_half_close == FALSE) {
if (socket->got_full_close == FALSE) {
@ -166,48 +174,51 @@ change_state_for_empty_out_stream (oi_socket *socket)
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
} else {
/* Got Full Close. */
if (socket->read_action)
#if HAVE_GNUTLS
if (socket->read_action) {
#if EVNET_HAVE_GNUTLS
socket->read_action = socket->secure ? secure_full_goodbye : full_close;
#else
socket->read_action = full_close;
#endif
}
if (socket->write_action)
#if HAVE_GNUTLS
if (socket->write_action) {
#if EVNET_HAVE_GNUTLS
socket->write_action = socket->secure ? secure_full_goodbye : full_close;
#else
socket->write_action = full_close;
#endif
}
}
} else {
/* Got Half Close. */
if (socket->write_action)
#if HAVE_GNUTLS
if (socket->write_action) {
#if EVNET_HAVE_GNUTLS
socket->write_action = socket->secure ? secure_half_goodbye : half_close;
#else
socket->write_action = half_close;
#endif
}
}
}
static void
update_write_buffer_after_send (oi_socket *socket, ssize_t sent)
update_write_buffer_after_send (evnet_socket *socket, ssize_t sent)
{
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
evnet_queue *q = evnet_queue_last(&socket->out_stream);
evnet_buf *to_write = evnet_queue_data(q, evnet_buf, queue);
to_write->written += sent;
socket->written += sent;
if (to_write->written == to_write->len) {
oi_queue_remove(q);
evnet_queue_remove(q);
if (to_write->release) {
to_write->release(to_write);
}
if (oi_queue_empty(&socket->out_stream)) {
if (evnet_queue_empty(&socket->out_stream)) {
change_state_for_empty_out_stream(socket);
if (socket->on_drain)
socket->on_drain(socket);
@ -215,15 +226,15 @@ update_write_buffer_after_send (oi_socket *socket, ssize_t sent)
}
}
#if HAVE_GNUTLS
static int secure_socket_send(oi_socket *socket);
static int secure_socket_recv(oi_socket *socket);
#if EVNET_HAVE_GNUTLS
static int secure_socket_send (evnet_socket *socket);
static int secure_socket_recv (evnet_socket *socket);
/* TODO can this be done without ignoring SIGPIPE? */
static ssize_t
nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len)
{
oi_socket *socket = (oi_socket*)data;
evnet_socket *socket = (evnet_socket*)data;
assert(socket->secure);
int flags = 0;
#ifdef MSG_NOSIGNAL
@ -242,7 +253,7 @@ nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len)
}
static int
secure_handshake(oi_socket *socket)
secure_handshake (evnet_socket *socket)
{
assert(socket->secure);
@ -256,51 +267,51 @@ secure_handshake(oi_socket *socket)
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
oi_socket_reset_timeout(socket);
evnet_socket_reset_timeout(socket);
if (!socket->connected) {
socket->connected = TRUE;
if (socket->on_connect) socket->on_connect(socket);
}
if (socket->read_action == secure_handshake)
if (socket->read_action == secure_handshake) {
socket->read_action = secure_socket_recv;
}
if (socket->write_action == secure_handshake)
if (socket->write_action == secure_handshake) {
socket->write_action = secure_socket_send;
}
return OKAY;
}
static int
secure_socket_send(oi_socket *socket)
secure_socket_send (evnet_socket *socket)
{
ssize_t sent;
if (oi_queue_empty(&socket->out_stream)) {
if (evnet_queue_empty(&socket->out_stream)) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
evnet_queue *q = evnet_queue_last(&socket->out_stream);
evnet_buf *to_write = evnet_queue_data(q, evnet_buf, queue);
assert(socket->secure);
sent = gnutls_record_send( socket->session
, to_write->base + to_write->written
, to_write->len - to_write->written
);
sent = gnutls_record_send(socket->session,
to_write->base + to_write->written,
to_write->len - to_write->written);
if (gnutls_error_is_fatal(sent)) {
socket->gnutls_errorno = sent;
return ERROR;
}
if (sent == 0)
return AGAIN;
if (sent == 0) return AGAIN;
oi_socket_reset_timeout(socket);
evnet_socket_reset_timeout(socket);
if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
return AGAIN;
@ -319,7 +330,7 @@ secure_socket_send(oi_socket *socket)
}
static int
secure_socket_recv(oi_socket *socket)
secure_socket_recv (evnet_socket *socket)
{
char recv_buffer[socket->chunksize];
size_t recv_buffer_size = socket->chunksize;
@ -340,7 +351,7 @@ secure_socket_recv(oi_socket *socket)
return AGAIN;
}
oi_socket_reset_timeout(socket);
evnet_socket_reset_timeout(socket);
/* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
* initiated a handshake. In that case the server can only initiate a
@ -361,14 +372,16 @@ secure_socket_recv(oi_socket *socket)
/* Got EOF */
if (recved == 0) {
socket->read_action = NULL;
if (socket->write_action == NULL)
CLOSE_ASAP(socket);
if (socket->write_action == NULL) CLOSE_ASAP(socket);
}
if (socket->write_action)
if (socket->write_action) {
socket->write_action = secure_socket_send;
}
if (socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
if (socket->on_read) {
socket->on_read(socket, recv_buffer, recved);
}
return OKAY;
}
@ -378,7 +391,7 @@ secure_socket_recv(oi_socket *socket)
}
static int
secure_full_goodbye (oi_socket *socket)
secure_full_goodbye (evnet_socket *socket)
{
assert(socket->secure);
@ -389,8 +402,7 @@ secure_full_goodbye (oi_socket *socket)
return ERROR;
}
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN;
CLOSE_ASAP(socket);
@ -398,7 +410,7 @@ secure_full_goodbye (oi_socket *socket)
}
static int
secure_half_goodbye (oi_socket *socket)
secure_half_goodbye (evnet_socket *socket)
{
assert(socket->secure);
@ -409,17 +421,15 @@ secure_half_goodbye (oi_socket *socket)
return ERROR;
}
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN;
if (socket->write_action)
socket->write_action = half_close;
if (socket->write_action) socket->write_action = half_close;
return OKAY;
}
void
oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session)
evnet_socket_set_secure_session (evnet_socket *socket, gnutls_session_t session)
{
socket->session = session;
socket->secure = TRUE;
@ -427,19 +437,19 @@ oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session)
#endif /* HAVE GNUTLS */
static int
socket_send (oi_socket *socket)
socket_send (evnet_socket *socket)
{
ssize_t sent;
assert(socket->secure == FALSE);
if (oi_queue_empty(&socket->out_stream)) {
if (evnet_queue_empty(&socket->out_stream)) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
evnet_queue *q = evnet_queue_last(&socket->out_stream);
evnet_buf *to_write = evnet_queue_data(q, evnet_buf, queue);
int flags = 0;
#ifdef MSG_NOSIGNAL
@ -451,11 +461,10 @@ socket_send (oi_socket *socket)
/* TODO use writev() here */
sent = send( socket->fd
, to_write->base + to_write->written
, to_write->len - to_write->written
, flags
);
sent = send(socket->fd,
to_write->base + to_write->written,
to_write->len - to_write->written,
flags);
if (sent < 0) {
switch (errno) {
@ -472,7 +481,7 @@ socket_send (oi_socket *socket)
}
}
oi_socket_reset_timeout(socket);
evnet_socket_reset_timeout(socket);
if (!socket->connected) {
socket->connected = TRUE;
@ -485,7 +494,7 @@ socket_send (oi_socket *socket)
}
static int
socket_recv (oi_socket *socket)
socket_recv (evnet_socket *socket)
{
char buf[TCP_MAXWIN];
size_t buf_size = TCP_MAXWIN;
@ -519,23 +528,22 @@ socket_recv (oi_socket *socket)
}
}
oi_socket_reset_timeout(socket);
evnet_socket_reset_timeout(socket);
if (recved == 0) {
oi_socket_read_stop(socket);
evnet_socket_read_stop(socket);
socket->read_action = NULL;
if (socket->write_action == NULL)
CLOSE_ASAP(socket);
if (socket->write_action == NULL) CLOSE_ASAP(socket);
}
/* NOTE: EOF is signaled with recved == 0 on callback */
if (socket->on_read) { socket->on_read(socket, buf, recved); }
if (socket->on_read) socket->on_read(socket, buf, recved);
return OKAY;
}
static void
assign_file_descriptor (oi_socket *socket, int fd)
assign_file_descriptor (evnet_socket *socket, int fd)
{
socket->fd = fd;
@ -545,100 +553,130 @@ assign_file_descriptor (oi_socket *socket, int fd)
socket->read_action = socket_recv;
socket->write_action = socket_send;
#if HAVE_GNUTLS
#if EVNET_HAVE_GNUTLS
if (socket->secure) {
gnutls_transport_set_lowat(socket->session, 0);
gnutls_transport_set_push_function(socket->session, nosigpipe_push);
gnutls_transport_set_ptr2 ( socket->session
/* recv */ , (gnutls_transport_ptr_t)fd
/* send */ , socket
);
gnutls_transport_set_ptr2(socket->session,
(gnutls_transport_ptr_t)fd, /* recv */
socket); /* send */
socket->read_action = secure_handshake;
socket->write_action = secure_handshake;
}
#endif
}
/* Internal callback
* Called by server->connection_watcher.
*/
static void
on_connection (EV_P_ ev_io *watcher, int revents)
server_close_with_error (evnet_server *server, int errorno)
{
oi_server *server = watcher->data;
// printf("on connection!\n");
assert(server->listening);
#if EV_MULTIPLICITY
assert(server->loop == loop);
#endif
assert(&server->connection_watcher == watcher);
if (server->listening) {
evnet_server_detach(server);
close(server->fd); /* TODO do this on the loop? check return value? */
server->fd = -1;
server->listening = FALSE;
if (EV_ERROR & revents) {
oi_server_close(server);
return;
if (server->on_close) {
server->on_close(server, errorno);
}
}
}
/* Retruns evnet_socket if a connection could be accepted.
* The returned socket is not yet attached to the event loop.
* Otherwise NULL
*/
static evnet_socket*
accept_connection (evnet_server *server)
{
struct sockaddr address; /* connector's address information */
socklen_t addr_len = sizeof(address);
/* TODO accept all possible connections? currently: just one */
int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len);
if (fd < 0) {
perror("accept()");
return;
#ifdef EWOULDBLOCK
if (errno == EWOULDBLOCK) return NULL;
#else
if (errno == EAGAIN) return NULL;
#endif
goto error;
}
oi_socket *socket = NULL;
if (server->on_connection)
socket = server->on_connection(server, (struct sockaddr*)&address, addr_len);
evnet_socket *socket = NULL;
if (server->on_connection) {
socket = server->on_connection(server, (struct sockaddr*)&address);
}
if (socket == NULL) {
close(fd);
return;
return NULL;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (r < 0) {
/* TODO error report */
}
if (set_nonblock(fd) != 0) goto error;
#ifdef SO_NOSIGPIPE
flags = 1;
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
r = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
if (r < 0) goto error;
#endif
socket->server = server;
assign_file_descriptor(socket, fd);
oi_socket_attach(EV_A_ socket);
return socket;
error:
server_close_with_error(server, errno);
return NULL;
}
/* Internal callback
* Called by server->connection_watcher.
*/
static void
on_connection (EV_P_ ev_io *watcher, int revents)
{
evnet_server *server = watcher->data;
assert(server->listening);
#if EV_MULTIPLICITY
assert(server->loop == loop);
#endif
assert(&server->connection_watcher == watcher);
if (EV_ERROR & revents) {
server_close_with_error(server, 1);
return;
}
/* accept as many connections as possible */
evnet_socket *socket;
while ((socket = accept_connection(server))) {
evnet_socket_attach(EV_A_ socket);
}
}
int
oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
evnet_server_listen (evnet_server *server, struct addrinfo *addrinfo, int backlog)
{
int fd = -1;
assert(server->listening == FALSE);
fd = socket( addrinfo->ai_family
, addrinfo->ai_socktype
, addrinfo->ai_protocol
);
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0) {
perror("socket()");
return -1;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (r < 0) {
perror("fcntl()");
if (set_nonblock(fd) != 0) {
perror("set_nonblock()");
return -1;
}
flags = 1;
int flags = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
@ -653,7 +691,7 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
return -1;
}
if (listen(fd, server->backlog) < 0) {
if (listen(fd, backlog) < 0) {
perror("listen()");
close(fd);
return -1;
@ -671,18 +709,13 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
* existing connections.
*/
void
oi_server_close(oi_server *server)
evnet_server_close (evnet_server *server)
{
if (server->listening) {
oi_server_detach(server);
close(server->fd);
/* TODO do this on the loop? check return value? */
server->listening = FALSE;
}
server_close_with_error(server, 0);
}
void
oi_server_attach (EV_P_ oi_server *server)
evnet_server_attach (EV_P_ evnet_server *server)
{
ev_io_start (EV_A_ &server->connection_watcher);
#if EV_MULTIPLICITY
@ -692,7 +725,7 @@ oi_server_attach (EV_P_ oi_server *server)
}
void
oi_server_detach (oi_server *server)
evnet_server_detach (evnet_server *server)
{
ev_io_stop (SERVER_LOOP_ &server->connection_watcher);
#if EV_MULTIPLICITY
@ -702,42 +735,43 @@ oi_server_detach (oi_server *server)
}
void
oi_server_init(oi_server *server, int backlog)
evnet_server_init (evnet_server *server)
{
server->backlog = backlog;
server->attached = FALSE;
server->listening = FALSE;
server->fd = -1;
server->connection_watcher.data = server;
ev_init (&server->connection_watcher, on_connection);
server->on_connection = NULL;
server->on_error = NULL;
server->data = NULL;
server->on_close = NULL;
}
/* Internal callback. called by socket->timeout_watcher */
static void
on_timeout (EV_P_ ev_timer *watcher, int revents)
{
oi_socket *socket = watcher->data;
evnet_socket *socket = watcher->data;
#if EV_MULTIPLICITY
assert(socket->loop == loop);
#endif
assert(revents == EV_TIMEOUT);
assert(watcher == &socket->timeout_watcher);
// printf("on_timeout\n");
if (socket->on_timeout) { socket->on_timeout(socket); }
if (socket->on_timeout) socket->on_timeout(socket);
// timeout does not automatically kill your connection. you must!
}
static void
release_write_buffer(oi_socket *socket)
release_write_buffer(evnet_socket *socket)
{
while (!oi_queue_empty(&socket->out_stream)) {
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *buf = oi_queue_data(q, oi_buf, queue);
oi_queue_remove(q);
if (buf->release) { buf->release(buf); }
while (!evnet_queue_empty(&socket->out_stream)) {
evnet_queue *q = evnet_queue_last(&socket->out_stream);
evnet_buf *buf = evnet_queue_data(q, evnet_buf, queue);
evnet_queue_remove(q);
if (buf->release) buf->release(buf);
}
}
@ -745,7 +779,7 @@ release_write_buffer(oi_socket *socket)
static void
on_io_event(EV_P_ ev_io *watcher, int revents)
{
oi_socket *socket = watcher->data;
evnet_socket *socket = watcher->data;
if (revents & EV_ERROR) {
socket->errorno = 1;
@ -758,27 +792,29 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
while (have_read_event || have_write_event) {
/* RECV LOOP - TRY TO CLEAR THE BUFFER */
if (socket->read_action == NULL)
if (socket->read_action == NULL) {
have_read_event = FALSE;
else {
} else {
r = socket->read_action(socket);
if (r == AGAIN)
if (r == AGAIN) {
have_read_event = FALSE;
else if (r == ERROR)
CLOSE_ASAP(socket);
} else {
if (r == ERROR) CLOSE_ASAP(socket);
}
}
/* SEND LOOP - TRY TO CLEAR THE BUFFER */
if (socket->write_action == NULL)
if (socket->write_action == NULL) {
have_write_event = FALSE;
else {
} else {
r = socket->write_action(socket);
if (r == AGAIN)
if (r == AGAIN) {
have_write_event = FALSE;
else if (r == ERROR)
CLOSE_ASAP(socket);
} else {
if (r == ERROR) CLOSE_ASAP(socket);
}
}
}
@ -790,10 +826,10 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
ev_clear_pending (EV_A_ &socket->read_watcher);
ev_clear_pending (EV_A_ &socket->timeout_watcher);
oi_socket_detach(socket);
evnet_socket_detach(socket);
assert(socket->fd == -1);
if (socket->on_close) { socket->on_close(socket); }
if (socket->on_close) socket->on_close(socket);
/* WARNING: user can free socket in on_close so no more
* access beyond this point. */
}
@ -807,7 +843,7 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
* gnutls_db_set_ptr (socket->session, _);
*/
void
oi_socket_init(oi_socket *socket, float timeout)
evnet_socket_init (evnet_socket *socket, float timeout)
{
socket->fd = -1;
socket->server = NULL;
@ -817,7 +853,7 @@ oi_socket_init(oi_socket *socket, float timeout)
socket->attached = FALSE;
socket->connected = FALSE;
oi_queue_init(&socket->out_stream);
evnet_queue_init(&socket->out_stream);
ev_init(&socket->write_watcher, on_io_event);
socket->write_watcher.data = socket;
@ -831,7 +867,7 @@ oi_socket_init(oi_socket *socket, float timeout)
socket->errorno = 0;
socket->secure = FALSE;
#if HAVE_GNUTLS
#if EVNET_HAVE_GNUTLS
socket->gnutls_errorno = 0;
socket->session = NULL;
#endif
@ -842,7 +878,8 @@ oi_socket_init(oi_socket *socket, float timeout)
socket->read_action = NULL;
socket->write_action = NULL;
socket->chunksize = TCP_MAXWIN;
socket->chunksize = 8192;
socket->on_connect = NULL;
socket->on_read = NULL;
socket->on_drain = NULL;
@ -850,22 +887,24 @@ oi_socket_init(oi_socket *socket, float timeout)
}
void
oi_socket_close (oi_socket *socket)
evnet_socket_close (evnet_socket *socket)
{
socket->got_half_close = TRUE;
if (oi_queue_empty(&socket->out_stream))
if (evnet_queue_empty(&socket->out_stream)) {
change_state_for_empty_out_stream(socket);
}
}
void
oi_socket_full_close (oi_socket *socket)
evnet_socket_full_close (evnet_socket *socket)
{
socket->got_full_close = TRUE;
if (oi_queue_empty(&socket->out_stream))
if (evnet_queue_empty(&socket->out_stream)) {
change_state_for_empty_out_stream(socket);
}
}
void oi_socket_force_close (oi_socket *socket)
void evnet_socket_force_close (evnet_socket *socket)
{
release_write_buffer(socket);
@ -874,19 +913,16 @@ void oi_socket_force_close (oi_socket *socket)
ev_clear_pending (SOCKET_LOOP_ &socket->timeout_watcher);
socket->write_action = socket->read_action = NULL;
// socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE
//
// socket->errorno = EVNET_SOCKET_ERROR_FORCE_CLOSE
if (socket->fd > 0) {
close(socket->fd);
}
if (socket->fd > 0) close(socket->fd);
socket->fd = -1;
oi_socket_detach(socket);
evnet_socket_detach(socket);
}
void
oi_socket_write(oi_socket *socket, oi_buf *buf)
evnet_socket_write (evnet_socket *socket, evnet_buf *buf)
{
if (socket->write_action == NULL) {
assert(0 && "Do not write to a closed socket");
@ -897,7 +933,7 @@ oi_socket_write(oi_socket *socket, oi_buf *buf)
goto error;
}
oi_queue_insert_head(&socket->out_stream, &buf->queue);
evnet_queue_insert_head(&socket->out_stream, &buf->queue);
buf->written = 0;
if (socket->attached) {
@ -910,13 +946,13 @@ error:
}
void
oi_socket_reset_timeout(oi_socket *socket)
evnet_socket_reset_timeout (evnet_socket *socket)
{
ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
}
static void
free_simple_buf ( oi_buf *buf )
free_simple_buf (evnet_buf *buf)
{
free(buf->base);
free(buf);
@ -926,18 +962,18 @@ free_simple_buf ( oi_buf *buf )
* NOTE: Allocates memory. Avoid for performance applications.
*/
void
oi_socket_write_simple(oi_socket *socket, const char *str, size_t len)
evnet_socket_write_simple (evnet_socket *socket, const char *str, size_t len)
{
oi_buf *buf = malloc(sizeof(oi_buf));
evnet_buf *buf = malloc(sizeof(evnet_buf));
buf->release = free_simple_buf;
buf->base = strdup(str);
buf->len = len;
oi_socket_write(socket, buf);
evnet_socket_write(socket, buf);
}
void
oi_socket_attach(EV_P_ oi_socket *socket)
evnet_socket_attach (EV_P_ evnet_socket *socket)
{
#if EV_MULTIPLICITY
socket->loop = loop;
@ -946,15 +982,17 @@ oi_socket_attach(EV_P_ oi_socket *socket)
ev_timer_again(EV_A_ &socket->timeout_watcher);
if (socket->read_action)
if (socket->read_action) {
ev_io_start(EV_A_ &socket->read_watcher);
}
if (socket->write_action)
if (socket->write_action) {
ev_io_start(EV_A_ &socket->write_watcher);
}
}
void
oi_socket_detach(oi_socket *socket)
evnet_socket_detach (evnet_socket *socket)
{
if (socket->attached) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
@ -968,14 +1006,14 @@ oi_socket_detach(oi_socket *socket)
}
void
oi_socket_read_stop (oi_socket *socket)
evnet_socket_read_stop (evnet_socket *socket)
{
ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
ev_clear_pending(SOCKET_LOOP_ &socket->read_watcher);
}
void
oi_socket_read_start (oi_socket *socket)
evnet_socket_read_start (evnet_socket *socket)
{
if (socket->read_action) {
ev_io_start(SOCKET_LOOP_ &socket->read_watcher);
@ -984,21 +1022,18 @@ oi_socket_read_start (oi_socket *socket)
}
int
oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo)
evnet_socket_connect (evnet_socket *s, struct addrinfo *addrinfo)
{
int fd = socket( addrinfo->ai_family
, addrinfo->ai_socktype
, addrinfo->ai_protocol
);
int fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0) {
perror("socket()");
return -1;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
int r = set_nonblock(fd);
if (r < 0) {
perror("fcntl()");
perror("set_nonblock()");
return -1;
}
@ -1007,10 +1042,7 @@ oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo)
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
#endif
r = connect( fd
, addrinfo->ai_addr
, addrinfo->ai_addrlen
);
r = connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen);
if (r < 0 && errno != EINPROGRESS) {
perror("connect");

232
deps/evnet/evnet.h

@ -0,0 +1,232 @@
/* Copyright (c) 2008,2009 Ryan Dahl
*
* evnet_queue comes from Nginx, ngx_queue.h
* Copyright (C) 2002-2009 Igor Sysoev
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <netdb.h>
#include <ev.h>
#include <stddef.h> /* offsetof() */
#ifndef evnet_h
#define evnet_h
#ifdef __cplusplus
extern "C" {
#endif
#ifndef EVNET_HAVE_GNUTLS
# define EVNET_HAVE_GNUTLS 0
#endif
#if EVNET_HAVE_GNUTLS
# include <gnutls/gnutls.h>
#endif
typedef struct evnet_queue evnet_queue;
typedef struct evnet_buf evnet_buf;
typedef struct evnet_server evnet_server;
typedef struct evnet_socket evnet_socket;
void evnet_server_init (evnet_server *);
int evnet_server_listen (evnet_server *, struct addrinfo *addrinfo, int backlog);
void evnet_server_attach (EV_P_ evnet_server *);
void evnet_server_detach (evnet_server *);
void evnet_server_close (evnet_server *); // synchronous
void evnet_socket_init (evnet_socket *, float timeout);
int evnet_socket_connect (evnet_socket *, struct addrinfo *addrinfo);
void evnet_socket_attach (EV_P_ evnet_socket *);
void evnet_socket_detach (evnet_socket *);
void evnet_socket_read_start (evnet_socket *);
void evnet_socket_read_stop (evnet_socket *);
/* Resets the timeout to stay alive for another socket->timeout seconds
*/
void evnet_socket_reset_timeout (evnet_socket *);
/* Writes a buffer to the socket.
* (Do not send a NULL evnet_buf or a buffer with evnet_buf->base == NULL.)
*/
void evnet_socket_write (evnet_socket *, evnet_buf *);
void evnet_socket_write_simple (evnet_socket *, const char *str, size_t len);
/* Once the write buffer is drained, evnet_socket_close will shutdown the
* writing end of the socket and will close the read end once the server
* replies with an EOF.
*/
void evnet_socket_close (evnet_socket *);
/* Do not wait for the server to reply with EOF. This will only be called
* once the write buffer is drained.
* Warning: For TCP socket, the OS kernel may (should) reply with RST
* packets if this is called when data is still being received from the
* server.
*/
void evnet_socket_full_close (evnet_socket *);
/* The most extreme measure.
* Will not wait for the write queue to complete.
*/
void evnet_socket_force_close (evnet_socket *);
#if EVNET_HAVE_GNUTLS
/* Tells the socket to use transport layer security (SSL). evnet_socket does
* not want to make any decisions about security requirements, so the
* majoirty of GnuTLS configuration is left to the user. Only the transport
* layer of GnuTLS is controlled by evnet_socket. That is, do not use
* gnutls_transport_* functions. Do use the rest of GnuTLS's API.
*/
void evnet_socket_set_secure_session (evnet_socket *, gnutls_session_t);
#endif
evnet_buf * evnet_buf_new (const char* base, size_t len);
evnet_buf * evnet_buf_new2 (size_t len);
void evnet_buf_destroy (evnet_buf *);
struct evnet_queue {
evnet_queue *prev;
evnet_queue *next;
};
struct evnet_buf {
/* public */
char *base;
size_t len;
void (*release) (evnet_buf *); /* called when oi is done with the object */
void *data;
/* private */
size_t written;
evnet_queue queue;
};
struct evnet_server {
/* read only */
int fd;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
unsigned attached:1;
unsigned listening:1;
/* PRIVATE */
ev_io connection_watcher;
/* PUBLIC */
evnet_socket* (*on_connection) (evnet_server *, struct sockaddr *remote_addr);
/* Executed when a server is closed.
* If evnet_server_close() was called errorno will be 0.
* An libev error is indicated with errorno == 1
* Otherwise errorno is a stdlib errno from a system call, e.g. accept()
*/
void (*on_close) (evnet_server *, int errorno);
void *data;
};
struct evnet_socket {
/* read only */
int fd;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
evnet_server *server;
evnet_queue out_stream;
size_t written;
unsigned attached:1;
unsigned connected:1;
unsigned secure:1;
unsigned got_full_close:1;
unsigned got_half_close:1;
/* NULL = that end of the socket is closed. */
int (*read_action) (evnet_socket *);
int (*write_action) (evnet_socket *);
/* ERROR CODES. 0 = no error. Check on_close. */
int errorno;
#if EVNET_HAVE_GNUTLS
int gnutls_errorno;
#endif
/* private */
ev_io write_watcher;
ev_io read_watcher;
ev_timer timeout_watcher;
#if EVNET_HAVE_GNUTLS
gnutls_session_t session;
#endif
/* public */
size_t chunksize; /* the maximum chunk that on_read() will return */
void (*on_connect) (evnet_socket *);
void (*on_read) (evnet_socket *, const void *buf, size_t count);
void (*on_drain) (evnet_socket *);
void (*on_close) (evnet_socket *);
void (*on_timeout) (evnet_socket *);
void *data;
};
EV_INLINE void
evnet_queue_init (evnet_queue *q)
{
q->prev = q;
q->next = q;
}
EV_INLINE void
evnet_queue_insert_head (evnet_queue *h, evnet_queue *x)
{
(x)->next = (h)->next;
(x)->next->prev = x;
(x)->prev = h;
(h)->next = x;
}
EV_INLINE void
evnet_queue_remove (evnet_queue *x)
{
(x)->next->prev = (x)->prev;
(x)->prev->next = (x)->next;
#ifndef NDEBUG
(x)->prev = NULL;
(x)->next = NULL;
#endif
}
#define evnet_queue_empty(h) (h == (h)->prev)
#define evnet_queue_head(h) (h)->next
#define evnet_queue_last(h) (h)->prev
#define evnet_queue_data(q, type, link) \
(type *) ((unsigned char *) q - offsetof(type, link))
#ifdef __cplusplus
}
#endif
#endif /* evnet_h */

102
deps/evnet/test/echo.c

@ -0,0 +1,102 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <ev.h>
#include <evnet.h>
#include <gnutls/gnutls.h>
#define HOST "127.0.0.1"
#define SOCKFILE "/tmp/oi.sock"
#define PORT "5000"
static int nconnections;
static void
on_peer_close (evnet_socket *socket)
{
assert(socket->errorno == 0);
//printf("server connection closed\n");
free(socket);
}
static void
on_peer_timeout (evnet_socket *socket)
{
assert(socket);
fprintf(stderr, "peer connection timeout\n");
assert(0);
}
// timeout must match the timeout in timeout.rb
#define TIMEOUT 5.0
static void
on_peer_read (evnet_socket *socket, const void *base, size_t len)
{
if(len == 0) return;
evnet_socket_write_simple(socket, base, len);
}
static evnet_socket*
on_server_connection (evnet_server *server, struct sockaddr *addr)
{
assert(server);
assert(addr);
evnet_socket *socket = malloc(sizeof(evnet_socket));
evnet_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
nconnections++;
//printf("on server connection\n");
return socket;
}
int
main (void)
{
int r;
evnet_server server;
//printf("sizeof(evnet_server): %d\n", sizeof(evnet_server));
//printf("sizeof(evnet_socket): %d\n", sizeof(evnet_socket));
evnet_server_init(&server);
server.on_connection = on_server_connection;
struct addrinfo *servinfo;
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
r = getaddrinfo(NULL, PORT, &hints, &servinfo);
assert(r == 0);
r = evnet_server_listen(&server, servinfo, 10);
assert(r == 0);
evnet_server_attach(EV_DEFAULT_ &server);
ev_loop(EV_DEFAULT_ 0);
freeaddrinfo(servinfo);
return 0;
}

478
deps/evnet/test/test.c

@ -0,0 +1,478 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <ev.h>
#include <evnet.h>
#if EVNET_HAVE_GNUTLS
# include <gnutls/gnutls.h>
#endif
static const struct addrinfo tcp_hints =
/* ai_flags */ { .ai_flags = 0
/* ai_family */ , .ai_family = AF_UNSPEC
/* ai_socktype */ , .ai_socktype = SOCK_STREAM
, 0
};
#define MARK_PROGRESS write(STDERR_FILENO, ".", 1)
#define HOST "127.0.0.1"
#define SOCKFILE "/tmp/oi.sock"
#define PORT "5000"
static evnet_server server;
static int nconnections;
static int use_tls;
static int got_server_close;
static void
common_on_server_close (evnet_server *server, int errorno)
{
assert(server);
assert(errorno == 0);
got_server_close = 1;
}
static void
common_on_peer_close (evnet_socket *socket)
{
assert(socket->errorno == 0);
printf("server connection closed\n");
#if EVNET_HAVE_GNUTLS
assert(socket->gnutls_errorno == 0);
if (use_tls) gnutls_deinit(socket->session);
#endif
free(socket);
}
static void
common_on_client_timeout (evnet_socket *socket)
{
assert(socket);
printf("client connection timeout\n");
}
static void
common_on_peer_timeout (evnet_socket *socket)
{
assert(socket);
fprintf(stderr, "peer connection timeout\n");
assert(0);
}
#if EVNET_HAVE_GNUTLS
#define DH_BITS 768
static gnutls_anon_server_credentials_t server_credentials;
const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 };
static gnutls_dh_params_t dh_params;
void anon_tls_server (evnet_socket *socket)
{
gnutls_session_t session;
socket->data = session;
int r = gnutls_init(&session, GNUTLS_SERVER);
assert(r == 0);
gnutls_set_default_priority(session);
gnutls_kx_set_priority (session, kx_prio);
gnutls_credentials_set(session, GNUTLS_CRD_ANON, server_credentials);
gnutls_dh_set_prime_bits(session, DH_BITS);
evnet_socket_set_secure_session(socket, session);
}
void anon_tls_client (evnet_socket *socket)
{
gnutls_session_t client_session;
gnutls_anon_client_credentials_t client_credentials;
gnutls_anon_allocate_client_credentials (&client_credentials);
gnutls_init(&client_session, GNUTLS_CLIENT);
gnutls_set_default_priority(client_session);
gnutls_kx_set_priority(client_session, kx_prio);
/* Need to enable anonymous KX specifically. */
gnutls_credentials_set(client_session, GNUTLS_CRD_ANON, client_credentials);
evnet_socket_set_secure_session(socket, client_session);
assert(socket->secure);
}
#endif // EVNET_HAVE_GNUTLS
#define PING "PING"
#define PONG "PONG"
#define EXCHANGES 5000
#define PINGPONG_TIMEOUT 5.0
static int successful_ping_count;
static void
pingpong_on_peer_read (evnet_socket *socket, const void *base, size_t len)
{
if (len == 0) {
evnet_socket_close(socket);
return;
}
char buf[2000];
strncpy(buf, base, len);
buf[len] = 0;
printf("server got message: %s\n", buf);
evnet_socket_write_simple(socket, PONG, sizeof PONG);
}
static void
pingpong_on_client_close (evnet_socket *socket)
{
assert(socket);
printf("client connection closed\n");
evnet_server_close(&server);
}
static evnet_socket*
pingpong_on_server_connection (evnet_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
assert(addr);
evnet_socket *socket = malloc(sizeof(evnet_socket));
evnet_socket_init(socket, PINGPONG_TIMEOUT);
socket->on_read = pingpong_on_peer_read;
socket->on_close = common_on_peer_close;
socket->on_timeout = common_on_peer_timeout;
nconnections++;
#if EVNET_HAVE_GNUTLS
if (use_tls) anon_tls_server(socket);
#endif
printf("on server connection\n");
return socket;
}
static void
pingpong_on_client_connect (evnet_socket *socket)
{
printf("client connected. sending ping\n");
evnet_socket_write_simple(socket, PING, sizeof PING);
}
static void
pingpong_on_client_read (evnet_socket *socket, const void *base, size_t len)
{
if(len == 0) {
evnet_socket_close(socket);
return;
}
assert(len = strlen(PONG));
char buf[len+1];
strncpy(buf, base, len);
buf[len] = 0;
printf("client got message: %s\n", buf);
assert(strcmp(buf, PONG) == 0);
if (++successful_ping_count > EXCHANGES) {
evnet_socket_close(socket);
return;
}
if (successful_ping_count % (EXCHANGES/20) == 0) MARK_PROGRESS;
evnet_socket_write_simple(socket, PING, sizeof PING);
}
int
pingpong (struct addrinfo *servinfo)
{
int r;
evnet_socket client;
successful_ping_count = 0;
nconnections = 0;
got_server_close = 0;
printf("sizeof(evnet_server): %d\n", sizeof(evnet_server));
printf("sizeof(evnet_socket): %d\n", sizeof(evnet_socket));
evnet_server_init(&server);
server.on_connection = pingpong_on_server_connection;
server.on_close = common_on_server_close;
r = evnet_server_listen(&server, servinfo, 10);
assert(r == 0);
evnet_server_attach(EV_DEFAULT_ &server);
evnet_socket_init(&client, PINGPONG_TIMEOUT);
client.on_read = pingpong_on_client_read;
client.on_connect = pingpong_on_client_connect;
client.on_close = pingpong_on_client_close;
client.on_timeout = common_on_client_timeout;
#if EVNET_HAVE_GNUTLS
if (use_tls) anon_tls_client(&client);
#endif
r = evnet_socket_connect(&client, servinfo);
assert(r == 0 && "problem connecting");
evnet_socket_attach(EV_DEFAULT_ &client);
ev_loop(EV_DEFAULT_ 0);
printf("successful_ping_count = %d\n", successful_ping_count);
assert(successful_ping_count == EXCHANGES + 1);
assert(nconnections == 1);
assert(got_server_close);
return 0;
}
#define NCONN 100
#define CONNINT_TIMEOUT 1000.0
static void
connint_on_peer_read(evnet_socket *socket, const void *base, size_t len)
{
assert(base);
assert(len == 0);
evnet_socket_write_simple(socket, "BYE", 3);
printf("server wrote bye\n");
}
static void
connint_on_peer_drain(evnet_socket *socket)
{
evnet_socket_close(socket);
}
static evnet_socket*
connint_on_server_connection(evnet_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
assert(addr);
evnet_socket *socket = malloc(sizeof(evnet_socket));
evnet_socket_init(socket, CONNINT_TIMEOUT);
socket->on_read = connint_on_peer_read;
socket->on_drain = connint_on_peer_drain;
socket->on_close = common_on_peer_close;
socket->on_timeout = common_on_peer_timeout;
#if EVNET_HAVE_GNUTLS
if (use_tls) anon_tls_server(socket);
#endif
printf("on server connection\n");
return socket;
}
static void
connint_on_client_connect (evnet_socket *socket)
{
printf("on client connection\n");
evnet_socket_close(socket);
}
static void
connint_on_client_close (evnet_socket *socket)
{
evnet_socket_close(socket); // already closed, but it shouldn't crash if we try to do it again
printf("client connection closed\n");
if (nconnections % (NCONN/20) == 0) MARK_PROGRESS;
if(++nconnections == NCONN) {
evnet_server_close(&server);
printf("closing server\n");
}
}
static void
connint_on_client_read (evnet_socket *socket, const void *base, size_t len)
{
if (len == 0) {
evnet_socket_close(socket);
return;
}
char buf[200000];
strncpy(buf, base, len);
buf[len] = 0;
printf("client got message: %s\n", buf);
assert(strcmp(buf, "BYE") == 0);
evnet_socket_close(socket);
}
int
connint (struct addrinfo *servinfo)
{
int r;
nconnections = 0;
got_server_close = 0;
evnet_server_init(&server);
server.on_connection = connint_on_server_connection;
server.on_close = common_on_server_close;
evnet_server_listen(&server, servinfo, 1000);
evnet_server_attach(EV_DEFAULT_ &server);
evnet_socket clients[NCONN];
int i;
for (i = 0; i < NCONN; i++) {
evnet_socket *client = &clients[i];
evnet_socket_init(client, CONNINT_TIMEOUT);
client->on_read = connint_on_client_read;
client->on_connect = connint_on_client_connect;
client->on_close = connint_on_client_close;
client->on_timeout = common_on_client_timeout;
#if EVNET_HAVE_GNUTLS
if (use_tls) anon_tls_client(client);
#endif
r = evnet_socket_connect(client, servinfo);
assert(r == 0 && "problem connecting");
evnet_socket_attach(EV_DEFAULT_ client);
}
ev_loop(EV_DEFAULT_ 0);
assert(nconnections == NCONN);
assert(got_server_close);
return 0;
}
struct addrinfo *
create_tcp_address (void)
{
struct addrinfo *servinfo;
int r = getaddrinfo(NULL, PORT, &tcp_hints, &servinfo);
assert(r == 0);
return servinfo;
}
void
free_tcp_address (struct addrinfo *servinfo)
{
freeaddrinfo(servinfo);
}
struct addrinfo *
create_unix_address (void)
{
struct addrinfo *servinfo;
struct stat tstat;
if (lstat(SOCKFILE, &tstat) == 0) {
assert(S_ISSOCK(tstat.st_mode));
unlink(SOCKFILE);
}
servinfo = malloc(sizeof(struct addrinfo));
servinfo->ai_family = AF_UNIX;
servinfo->ai_socktype = SOCK_STREAM;
servinfo->ai_protocol = 0;
struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1);
sockaddr->sun_family = AF_UNIX;
strcpy(sockaddr->sun_path, SOCKFILE);
servinfo->ai_addr = (struct sockaddr*)sockaddr;
servinfo->ai_addrlen = sizeof(struct sockaddr_un);
return servinfo;
}
void
free_unix_address (struct addrinfo *servinfo)
{
free(servinfo->ai_addr);
free(servinfo);
}
int
main (void)
{
#if EVNET_HAVE_GNUTLS
gnutls_global_init();
gnutls_dh_params_init (&dh_params);
fsync((int)stderr);
gnutls_dh_params_generate2 (dh_params, DH_BITS);
gnutls_anon_allocate_server_credentials (&server_credentials);
gnutls_anon_set_server_dh_params (server_credentials, dh_params);
#endif
struct addrinfo *tcp_address = create_tcp_address();
struct addrinfo *unix_address;
use_tls = 0;
assert(pingpong(tcp_address) == 0);
assert(connint(tcp_address) == 0);
#if EVNET_HAVE_GNUTLS
use_tls = 1;
assert(pingpong(tcp_address) == 0);
assert(connint(tcp_address) == 0);
#endif
use_tls = 0;
unix_address = create_unix_address();
assert(pingpong(unix_address) == 0);
free_unix_address(unix_address);
unix_address = create_unix_address();
assert(connint(unix_address) == 0);
free_unix_address(unix_address);
#if EVNET_HAVE_GNUTLS
use_tls = 1;
unix_address = create_unix_address();
assert(pingpong(unix_address) == 0);
free_unix_address(unix_address);
unix_address = create_unix_address();
assert(connint(unix_address) == 0);
free_unix_address(unix_address);
#endif
free_tcp_address(tcp_address);
return 0;
}

6
deps/liboi/test/timeout.rb → deps/evnet/test/timeout.rb

@ -1,20 +1,22 @@
#!/usr/bin/env ruby
require 'socket'
def test(description)
pid = fork do
exec(File.dirname(__FILE__) + "/echo")
end
puts "#{description}: "
begin
sleep 0.5 # give time for the server to start
yield(pid)
rescue
puts "\033[1;31mFAIL\033[m: #{description}"
puts "\033[1;31mFAIL\033[m"
raise $!
ensure
`kill -9 #{pid}`
end
puts "\033[1;32mPASS\033[m: #{description}"
puts "\033[1;32mPASS\033[m"
end
test("make sure echo server works") do

216
deps/liboi/oi_socket.h

@ -1,216 +0,0 @@
/* Copyright (c) 2008,2009 Ryan Dahl
*
* oi_queue comes from ngx_queue.h
* Copyright (C) 2002-2009 Igor Sysoev
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <netdb.h>
#include <ev.h>
#include <stddef.h> /* offsetof() */
#ifndef oi_socket_h
#define oi_socket_h
#ifdef __cplusplus
extern "C" {
#endif
#ifndef HAVE_GNUTLS
# define HAVE_GNUTLS 0
#endif
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
#endif
typedef struct oi_queue oi_queue;
struct oi_queue {
oi_queue *prev;
oi_queue *next;
};
#define oi_queue_init(q) \
(q)->prev = q; \
(q)->next = q
#define oi_queue_empty(h) \
(h == (h)->prev)
#define oi_queue_insert_head(h, x) \
(x)->next = (h)->next; \
(x)->next->prev = x; \
(x)->prev = h; \
(h)->next = x
#define oi_queue_head(h) \
(h)->next
#define oi_queue_last(h) \
(h)->prev
#define oi_queue_remove(x) \
(x)->next->prev = (x)->prev; \
(x)->prev->next = (x)->next; \
(x)->prev = NULL; \
(x)->next = NULL
#define oi_queue_data(q, type, link) \
(type *) ((unsigned char *) q - offsetof(type, link))
typedef struct oi_buf oi_buf;
typedef struct oi_server oi_server;
typedef struct oi_socket oi_socket;
oi_buf * oi_buf_new (const char* base, size_t len);
oi_buf * oi_buf_new2 (size_t len);
void oi_buf_destroy (oi_buf *);
void oi_server_init (oi_server *, int backlog);
int oi_server_listen (oi_server *, struct addrinfo *addrinfo);
void oi_server_attach (EV_P_ oi_server *);
void oi_server_detach (oi_server *);
void oi_server_close (oi_server *);
void oi_socket_init (oi_socket *, float timeout);
int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo);
void oi_socket_attach (EV_P_ oi_socket *);
void oi_socket_detach (oi_socket *);
void oi_socket_read_start (oi_socket *);
void oi_socket_read_stop (oi_socket *);
/* Resets the timeout to stay alive for another socket->timeout seconds
*/
void oi_socket_reset_timeout (oi_socket *);
/* Writes a buffer to the socket.
* (Do not send a NULL oi_buf or a buffer with oi_buf->base == NULL.)
*/
void oi_socket_write (oi_socket *, oi_buf *);
void oi_socket_write_simple (oi_socket *, const char *str, size_t len);
/* Once the write buffer is drained, oi_socket_close will shutdown the
* writing end of the socket and will close the read end once the server
* replies with an EOF.
*/
void oi_socket_close (oi_socket *);
/* Do not wait for the server to reply with EOF. This will only be called
* once the write buffer is drained.
* Warning: For TCP socket, the OS kernel may (should) reply with RST
* packets if this is called when data is still being received from the
* server.
*/
void oi_socket_full_close (oi_socket *);
/* The most extreme measure.
* Will not wait for the write queue to complete.
*/
void oi_socket_force_close (oi_socket *);
#if HAVE_GNUTLS
/* Tells the socket to use transport layer security (SSL). oi_socket does
* not want to make any decisions about security requirements, so the
* majoirty of GnuTLS configuration is left to the user. Only the transport
* layer of GnuTLS is controlled by oi_socket. That is, do not use
* gnutls_transport_* functions. Do use the rest of GnuTLS's API.
*/
void oi_socket_set_secure_session (oi_socket *, gnutls_session_t);
#endif
struct oi_buf {
/* public */
char *base;
size_t len;
void (*release) (oi_buf *); /* called when oi is done with the object */
void *data;
/* private */
size_t written;
oi_queue queue;
};
struct oi_server {
/* read only */
int fd;
int backlog;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
unsigned attached:1;
unsigned listening:1;
/* private */
ev_io connection_watcher;
/* public */
oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len);
void (*on_error) (oi_server *);
void *data;
};
struct oi_socket {
/* read only */
int fd;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
oi_server *server;
oi_queue out_stream;
size_t written;
unsigned attached:1;
unsigned connected:1;
unsigned secure:1;
unsigned got_full_close:1;
unsigned got_half_close:1;
/* NULL = that end of the socket is closed. */
int (*read_action) (oi_socket *);
int (*write_action) (oi_socket *);
/* ERROR CODES. 0 = no error. Check on_close. */
int errorno;
#if HAVE_GNUTLS
int gnutls_errorno;
#endif
/* private */
ev_io write_watcher;
ev_io read_watcher;
ev_timer timeout_watcher;
#if HAVE_GNUTLS
gnutls_session_t session;
#endif
/* public */
size_t chunksize; /* the maximum chunk that on_read() will return */
void (*on_connect) (oi_socket *);
void (*on_read) (oi_socket *, const void *buf, size_t count);
void (*on_drain) (oi_socket *);
void (*on_close) (oi_socket *);
void (*on_timeout) (oi_socket *);
void *data;
};
#ifdef __cplusplus
}
#endif
#endif /* oi_socket_h */

106
deps/liboi/test/common.c

@ -1,106 +0,0 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <ev.h>
#include <oi_socket.h>
#include <gnutls/gnutls.h>
#define HOST "127.0.0.1"
#define SOCKFILE "/tmp/oi.sock"
#define PORT "5000"
int nconnections;
static void
on_peer_close(oi_socket *socket)
{
assert(socket->errorno == 0);
//printf("server connection closed\n");
#if HAVE_GNUTLS
assert(socket->gnutls_errorno == 0);
#if SECURE
gnutls_deinit(socket->session);
#endif
#endif
free(socket);
}
static void
on_client_timeout(oi_socket *socket)
{
printf("client connection timeout\n");
assert(0);
}
static void
on_peer_timeout(oi_socket *socket)
{
fprintf(stderr, "peer connection timeout\n");
assert(0);
}
#if HAVE_GNUTLS
#if SECURE
#define DH_BITS 768
gnutls_anon_server_credentials_t server_credentials;
const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 };
static gnutls_dh_params_t dh_params;
void anon_tls_init()
{
gnutls_global_init();
gnutls_dh_params_init (&dh_params);
fprintf(stderr, "..");
fsync((int)stderr);
gnutls_dh_params_generate2 (dh_params, DH_BITS);
fprintf(stderr, ".");
gnutls_anon_allocate_server_credentials (&server_credentials);
gnutls_anon_set_server_dh_params (server_credentials, dh_params);
}
void anon_tls_server(oi_socket *socket)
{
gnutls_session_t session;
socket->data = session;
int r = gnutls_init(&session, GNUTLS_SERVER);
assert(r == 0);
gnutls_set_default_priority(session);
gnutls_kx_set_priority (session, kx_prio);
gnutls_credentials_set(session, GNUTLS_CRD_ANON, server_credentials);
gnutls_dh_set_prime_bits(session, DH_BITS);
oi_socket_set_secure_session(socket, session);
}
void anon_tls_client(oi_socket *socket)
{
gnutls_session_t client_session;
gnutls_anon_client_credentials_t client_credentials;
gnutls_anon_allocate_client_credentials (&client_credentials);
gnutls_init (&client_session, GNUTLS_CLIENT);
gnutls_set_default_priority(client_session);
gnutls_kx_set_priority(client_session, kx_prio);
/* Need to enable anonymous KX specifically. */
gnutls_credentials_set (client_session, GNUTLS_CRD_ANON, client_credentials);
oi_socket_set_secure_session(socket, client_session);
assert(socket->secure);
}
#endif // SECURE
#endif // HAVE_GNUTLS

154
deps/liboi/test/connection_interruption.c

@ -1,154 +0,0 @@
#include "test/common.c"
#define NCONN 100
#define TIMEOUT 1000.0
static oi_server server;
static void
on_peer_read(oi_socket *socket, const void *base, size_t len)
{
assert(len == 0);
oi_socket_write_simple(socket, "BYE", 3);
//printf("server wrote bye\n");
}
static void
on_peer_drain(oi_socket *socket)
{
oi_socket_close(socket);
}
static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket *socket = malloc(sizeof(oi_socket));
oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_drain = on_peer_drain;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
#if HAVE_GNUTLS
# if SECURE
anon_tls_server(socket);
# endif
#endif
//printf("on server connection\n");
return socket;
}
static void
on_client_connect(oi_socket *socket)
{
//printf("on client connection\n");
oi_socket_close(socket);
}
static void
on_client_close(oi_socket *socket)
{
oi_socket_close(socket); // already closed, but it shouldn't crash if we try to do it again
//printf("client connection closed\n");
if(++nconnections == NCONN) {
oi_server_detach(&server);
//printf("detaching server\n");
}
}
static void
on_client_read(oi_socket *socket, const void *base, size_t len)
{
if (len == 0) {
oi_socket_close(socket);
return;
}
char buf[200000];
strncpy(buf, base, len);
buf[len] = 0;
//printf("client got message: %s\n", buf);
if (strcmp(buf, "BYE") == 0) {
oi_socket_close(socket);
} else {
assert(0);
}
}
int
main(int argc, const char *argv[])
{
int r;
oi_server_init(&server, 1000);
server.on_connection = on_server_connection;
#if HAVE_GNUTLS
# if SECURE
anon_tls_init();
# endif
#endif
struct addrinfo *servinfo;
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
#if TCP
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
r = getaddrinfo(NULL, PORT, &hints, &servinfo);
assert(r == 0);
#else
struct stat tstat;
if (lstat(SOCKFILE, &tstat) == 0) {
if (S_ISSOCK(tstat.st_mode))
unlink(SOCKFILE);
}
servinfo = malloc(sizeof(struct addrinfo));
servinfo->ai_family = AF_UNIX;
servinfo->ai_socktype = SOCK_STREAM;
servinfo->ai_protocol = 0;
struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1);
sockaddr->sun_family = AF_UNIX;
strcpy(sockaddr->sun_path, SOCKFILE);
servinfo->ai_addr = (struct sockaddr*)sockaddr;
servinfo->ai_addrlen = sizeof(struct sockaddr_un);
#endif
oi_server_listen(&server, servinfo);
oi_server_attach(EV_DEFAULT_ &server);
int i;
for(i = 0; i < NCONN; i++) {
oi_socket *client = malloc(sizeof(oi_socket));
oi_socket_init(client, TIMEOUT);
client->on_read = on_client_read;
client->on_connect = on_client_connect;
client->on_close = on_client_close;
client->on_timeout = on_client_timeout;
#if HAVE_GNUTLS
#if SECURE
anon_tls_client(client);
#endif
#endif
r = oi_socket_connect(client, servinfo);
assert(r == 0 && "problem connecting");
oi_socket_attach(EV_DEFAULT_ client);
}
ev_loop(EV_DEFAULT_ 0);
assert(nconnections == NCONN);
#if TCP
freeaddrinfo(servinfo);
#endif
return 0;
}

97
deps/liboi/test/echo.c

@ -1,97 +0,0 @@
#include "test/common.c"
// timeout must match the timeout in timeout.rb
#define TIMEOUT 5.0
int successful_ping_count;
static void
on_peer_read(oi_socket *socket, const void *base, size_t len)
{
if(len == 0)
return;
oi_socket_write_simple(socket, base, len);
}
static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket *socket = malloc(sizeof(oi_socket));
oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
nconnections++;
#if HAVE_GNUTLS
# if SECURE
anon_tls_server(socket);
# endif
#endif
//printf("on server connection\n");
return socket;
}
int
main(int argc, const char *argv[])
{
int r;
oi_server server;
oi_socket client;
//printf("sizeof(oi_server): %d\n", sizeof(oi_server));
//printf("sizeof(oi_socket): %d\n", sizeof(oi_socket));
oi_server_init(&server, 10);
server.on_connection = on_server_connection;
#if HAVE_GNUTLS
# if SECURE
anon_tls_init();
# endif
#endif
struct addrinfo *servinfo;
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
#if TCP
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
r = getaddrinfo(NULL, PORT, &hints, &servinfo);
assert(r == 0);
#else
struct stat tstat;
if (lstat(SOCKFILE, &tstat) == 0) {
if (S_ISSOCK(tstat.st_mode))
unlink(SOCKFILE);
}
servinfo = malloc(sizeof(struct addrinfo));
servinfo->ai_family = AF_UNIX;
servinfo->ai_socktype = SOCK_STREAM;
servinfo->ai_protocol = 0;
struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1);
sockaddr->sun_family = AF_UNIX;
strcpy(sockaddr->sun_path, SOCKFILE);
servinfo->ai_addr = (struct sockaddr*)sockaddr;
servinfo->ai_addrlen = sizeof(struct sockaddr_un);
#endif
r = oi_server_listen(&server, servinfo);
assert(r == 0);
oi_server_attach(EV_DEFAULT_ &server);
ev_loop(EV_DEFAULT_ 0);
#if TCP
freeaddrinfo(servinfo);
#endif
return 0;
}

164
deps/liboi/test/ping_pong.c

@ -1,164 +0,0 @@
#include "test/common.c"
#define PING "PING"
#define PONG "PONG"
#define EXCHANGES 500
#define TIMEOUT 5.0
int successful_ping_count;
static void
on_peer_read(oi_socket *socket, const void *base, size_t len)
{
if (len == 0) {
oi_socket_close(socket);
return;
}
char buf[2000];
strncpy(buf, base, len);
buf[len] = 0;
//printf("server got message: %s\n", buf);
oi_socket_write_simple(socket, PONG, sizeof PONG);
}
static void
on_client_close(oi_socket *socket)
{
//printf("client connection closed\n");
ev_unloop(EV_DEFAULT_ EVUNLOOP_ALL);
}
static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket *socket = malloc(sizeof(oi_socket));
oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
nconnections++;
#if HAVE_GNUTLS
# if SECURE
anon_tls_server(socket);
# endif
#endif
//printf("on server connection\n");
return socket;
}
static void
on_client_connect (oi_socket *socket)
{
//printf("client connected. sending ping\n");
oi_socket_write_simple(socket, PING, sizeof PING);
}
static void
on_client_read (oi_socket *socket, const void *base, size_t len)
{
if(len == 0) {
oi_socket_close(socket);
return;
}
char buf[200000];
strncpy(buf, base, len);
buf[len] = 0;
//printf("client got message: %s\n", buf);
if(strcmp(buf, PONG) == 0) {
if(++successful_ping_count > EXCHANGES) {
oi_socket_close(socket);
return;
}
oi_socket_write_simple(socket, PING, sizeof PING);
} else {
assert(0);
}
}
int
main(int argc, const char *argv[])
{
int r;
oi_server server;
oi_socket client;
//printf("sizeof(oi_server): %d\n", sizeof(oi_server));
//printf("sizeof(oi_socket): %d\n", sizeof(oi_socket));
oi_server_init(&server, 10);
server.on_connection = on_server_connection;
#if HAVE_GNUTLS
# if SECURE
anon_tls_init();
# endif
#endif
struct addrinfo *servinfo;
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
#if TCP
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
r = getaddrinfo(NULL, PORT, &hints, &servinfo);
assert(r == 0);
#else
struct stat tstat;
if (lstat(SOCKFILE, &tstat) == 0) {
if (S_ISSOCK(tstat.st_mode))
unlink(SOCKFILE);
}
servinfo = malloc(sizeof(struct addrinfo));
servinfo->ai_family = AF_UNIX;
servinfo->ai_socktype = SOCK_STREAM;
servinfo->ai_protocol = 0;
struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1);
sockaddr->sun_family = AF_UNIX;
strcpy(sockaddr->sun_path, SOCKFILE);
servinfo->ai_addr = (struct sockaddr*)sockaddr;
servinfo->ai_addrlen = sizeof(struct sockaddr_un);
#endif
r = oi_server_listen(&server, servinfo);
assert(r == 0);
oi_server_attach(EV_DEFAULT_ &server);
oi_socket_init(&client, TIMEOUT);
client.on_read = on_client_read;
client.on_connect = on_client_connect;
client.on_close = on_client_close;
client.on_timeout = on_client_timeout;
#if HAVE_GNUTLS
# if SECURE
anon_tls_client(&client);
# endif
#endif
r = oi_socket_connect(&client, servinfo);
assert(r == 0 && "problem connecting");
oi_socket_attach(EV_DEFAULT_ &client);
ev_loop(EV_DEFAULT_ 0);
assert(successful_ping_count == EXCHANGES + 1);
assert(nconnections == 1);
#if TCP
freeaddrinfo(servinfo);
#endif
return 0;
}

16
src/net.cc

@ -101,7 +101,7 @@ Connection::Init (void)
{
opening = false;
double timeout = 60.0; // default
oi_socket_init(&socket_, timeout);
evnet_socket_init(&socket_, timeout);
socket_.on_connect = Connection::on_connect;
socket_.on_read = Connection::on_read;
socket_.on_drain = Connection::on_drain;
@ -267,13 +267,13 @@ Connection::AfterResolve (eio_req *req)
// no error. return.
if (r == 0 && req->result == 0) {
oi_socket_attach (EV_DEFAULT_UC_ &connection->socket_);
evnet_socket_attach (EV_DEFAULT_UC_ &connection->socket_);
goto out;
}
/* RESOLVE ERROR */
/* TODO: the whole resolve process should be moved into oi_socket.
/* TODO: the whole resolve process should be moved into evnet_socket.
* The fact that I'm modifying a read-only variable here should be
* good evidence of this.
*/
@ -368,7 +368,7 @@ Connection::Send (const Arguments& args)
// XXX
// A lot of improvement can be made here. First of all we're allocating
// oi_bufs for every send which is clearly inefficent - it should use a
// evnet_bufs for every send which is clearly inefficent - it should use a
// memory pool or ring buffer. Of course, expressing binary data as an
// array of integers is extremely inefficent. This can improved when v8
// bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been
@ -378,7 +378,7 @@ Connection::Send (const Arguments& args)
enum encoding enc = ParseEncoding(args[1]);
Local<String> s = args[0]->ToString();
size_t len = s->Utf8Length();
oi_buf *buf = node::buf_new(len);
evnet_buf *buf = node::buf_new(len);
switch (enc) {
case RAW:
case ASCII:
@ -397,7 +397,7 @@ Connection::Send (const Arguments& args)
} else if (args[0]->IsArray()) {
Handle<Array> array = Handle<Array>::Cast(args[0]);
size_t len = array->Length();
oi_buf *buf = node::buf_new(len);
evnet_buf *buf = node::buf_new(len);
for (size_t i = 0; i < len; i++) {
Local<Value> int_value = array->Get(Integer::New(i));
buf->base[i] = int_value->IntegerValue();
@ -517,12 +517,10 @@ Server::UnwrapConnection (Local<Object> connection)
}
Connection*
Server::OnConnection (struct sockaddr *addr, socklen_t len)
Server::OnConnection (struct sockaddr *addr)
{
HandleScope scope;
assert(len > 0); // just to get rid of the warning.
TryCatch try_catch;
Local<Object> js_connection =

42
src/net.h

@ -4,7 +4,7 @@
#include "node.h"
#include "events.h"
#include <v8.h>
#include <oi_socket.h>
#include <evnet.h>
namespace node {
@ -41,12 +41,12 @@ protected:
virtual ~Connection (void);
int Connect (struct addrinfo *address) {
return oi_socket_connect (&socket_, address);
return evnet_socket_connect (&socket_, address);
}
void Send (oi_buf *buf) { oi_socket_write(&socket_, buf); }
void Close (void) { oi_socket_close(&socket_); }
void FullClose (void) { oi_socket_full_close(&socket_); }
void ForceClose (void) { oi_socket_force_close(&socket_); }
void Send (evnet_buf *buf) { evnet_socket_write(&socket_, buf); }
void Close (void) { evnet_socket_close(&socket_); }
void FullClose (void) { evnet_socket_full_close(&socket_); }
void ForceClose (void) { evnet_socket_force_close(&socket_); }
virtual void OnConnect (void);
virtual void OnReceive (const void *buf, size_t len);
@ -66,12 +66,12 @@ protected:
private:
/* liboi callbacks */
static void on_connect (oi_socket *s) {
static void on_connect (evnet_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnConnect();
}
static void on_read (oi_socket *s, const void *buf, size_t len) {
static void on_read (evnet_socket *s, const void *buf, size_t len) {
Connection *connection = static_cast<Connection*> (s->data);
if (len == 0)
connection->OnEOF();
@ -79,12 +79,12 @@ private:
connection->OnReceive(buf, len);
}
static void on_drain (oi_socket *s) {
static void on_drain (evnet_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnDrain();
}
static void on_close (oi_socket *s) {
static void on_close (evnet_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnDisconnect();
@ -96,7 +96,7 @@ private:
connection->Detach();
}
static void on_timeout (oi_socket *s) {
static void on_timeout (evnet_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnTimeout();
}
@ -107,7 +107,7 @@ private:
static int AfterResolve (eio_req *req);
char *host_;
char *port_;
oi_socket socket_;
evnet_socket socket_;
friend class Server;
};
@ -124,25 +124,25 @@ protected:
Server (void) : EventEmitter()
{
oi_server_init(&server_, 1024);
evnet_server_init(&server_);
server_.on_connection = Server::on_connection;
server_.data = this;
}
virtual ~Server () {
oi_server_close (&server_);
evnet_server_close (&server_);
}
int Listen (struct addrinfo *address) {
int r = oi_server_listen (&server_, address);
int r = evnet_server_listen (&server_, address, 1024);
if(r != 0) return r;
oi_server_attach (EV_DEFAULT_ &server_);
evnet_server_attach (EV_DEFAULT_ &server_);
Attach();
return 0;
}
void Close ( ) {
oi_server_close (&server_);
evnet_server_close (&server_);
Detach();
}
@ -150,14 +150,14 @@ protected:
virtual Connection* UnwrapConnection (v8::Local<v8::Object> connection);
private:
Connection* OnConnection (struct sockaddr *addr, socklen_t len);
static oi_socket* on_connection (oi_server *s, struct sockaddr *addr, socklen_t len) {
Connection* OnConnection (struct sockaddr *addr);
static evnet_socket* on_connection (evnet_server *s, struct sockaddr *addr) {
Server *server = static_cast<Server*> (s->data);
Connection *connection = server->OnConnection (addr, len);
Connection *connection = server->OnConnection (addr);
return &connection->socket_;
}
oi_server server_;
evnet_server server_;
};
} // namespace node

10
src/node.cc

@ -24,21 +24,21 @@ using namespace node;
using namespace std;
static void
buf_free (oi_buf *b)
buf_free (evnet_buf *b)
{
V8::AdjustAmountOfExternalAllocatedMemory(-b->len);
free(b);
}
oi_buf *
evnet_buf *
node::buf_new (size_t size)
{
size_t total = sizeof(oi_buf) + size;
size_t total = sizeof(evnet_buf) + size;
void *p = malloc(total);
if (p == NULL) return NULL;
oi_buf *b = static_cast<oi_buf*>(p);
b->base = static_cast<char*>(p) + sizeof(oi_buf);
evnet_buf *b = static_cast<evnet_buf*>(p);
b->base = static_cast<char*>(p) + sizeof(evnet_buf);
b->len = size;
b->release = buf_free;

4
src/node.h

@ -4,7 +4,7 @@
#include <ev.h>
#include <eio.h>
#include <v8.h>
#include <oi_socket.h>
#include <evnet.h>
#include "object_wrap.h"
@ -30,7 +30,7 @@ do { \
enum encoding {ASCII, UTF8, RAW};
enum encoding ParseEncoding (v8::Handle<v8::Value> encoding_v);
void FatalException (v8::TryCatch &try_catch);
oi_buf * buf_new (size_t size);
evnet_buf * buf_new (size_t size);
} // namespace node
#endif // node_h

28
src/process.cc

@ -100,13 +100,13 @@ Process::Write (const Arguments& args)
// XXX
// A lot of improvement can be made here. First of all we're allocating
// oi_bufs for every send which is clearly inefficent - it should use a
// evnet_bufs for every send which is clearly inefficent - it should use a
// memory pool or ring buffer. Of course, expressing binary data as an
// array of integers is extremely inefficent. This can improved when v8
// bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been
// addressed.
oi_buf *buf;
evnet_buf *buf;
size_t len;
if (args[0]->IsString()) {
@ -196,7 +196,7 @@ Process::Process ()
pid_ = 0;
oi_queue_init(&out_stream_);
evnet_queue_init(&out_stream_);
}
Process::~Process ()
@ -208,10 +208,10 @@ void
Process::Shutdown ()
{
// Clear the out_stream
while (!oi_queue_empty(&out_stream_)) {
oi_queue *q = oi_queue_last(&out_stream_);
oi_buf *buf = (oi_buf*) oi_queue_data(q, oi_buf, queue);
oi_queue_remove(q);
while (!evnet_queue_empty(&out_stream_)) {
evnet_queue *q = evnet_queue_last(&out_stream_);
evnet_buf *buf = (evnet_buf*) evnet_queue_data(q, evnet_buf, queue);
evnet_queue_remove(q);
if (buf->release) buf->release(buf);
}
@ -394,9 +394,9 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents)
assert(revents == EV_WRITE);
assert(process->stdin_pipe_[1] >= 0);
while (!oi_queue_empty(&process->out_stream_)) {
oi_queue *q = oi_queue_last(&process->out_stream_);
oi_buf *to_write = (oi_buf*) oi_queue_data(q, oi_buf, queue);
while (!evnet_queue_empty(&process->out_stream_)) {
evnet_queue *q = evnet_queue_last(&process->out_stream_);
evnet_buf *to_write = (evnet_buf*) evnet_queue_data(q, evnet_buf, queue);
sent = write( process->stdin_pipe_[1]
, to_write->base + to_write->written
@ -417,12 +417,12 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents)
to_write->written += sent;
if (to_write->written == to_write->len) {
oi_queue_remove(q);
evnet_queue_remove(q);
if (to_write->release) to_write->release(to_write);
}
}
if (oi_queue_empty(&process->out_stream_)) {
if (evnet_queue_empty(&process->out_stream_)) {
ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_);
if (process->got_close_) {
close(process->stdin_pipe_[1]);
@ -461,10 +461,10 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents)
}
int
Process::Write (oi_buf *buf)
Process::Write (evnet_buf *buf)
{
if (STDIN_CLOSED || got_close_ || got_chld_) return -1;
oi_queue_insert_head(&out_stream_, &buf->queue);
evnet_queue_insert_head(&out_stream_, &buf->queue);
buf->written = 0;
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
return 0;

6
src/process.h

@ -5,7 +5,7 @@
#include "events.h"
#include <v8.h>
#include <ev.h>
#include <oi_socket.h>
#include <evnet.h>
namespace node {
@ -26,7 +26,7 @@ class Process : EventEmitter {
~Process();
int Spawn (const char *command);
int Write (oi_buf *buf);
int Write (evnet_buf *buf);
int Close (void);
int Kill (int sig);
@ -54,7 +54,7 @@ class Process : EventEmitter {
bool got_chld_;
int exit_code_;
oi_queue out_stream_;
evnet_queue out_stream_;
};
} // namespace node

22
wscript

@ -112,16 +112,16 @@ def build(bld):
v8_debug.rule = v8rule % ( v8dir_src , deps_tgt , v8dir_tgt, scons, "debug")
v8_debug.target = join("deps/v8", bld.env["staticlib_PATTERN"] % "v8_g")
### oi
oi = bld.new_task_gen("cc", "staticlib")
oi.source = "deps/liboi/oi_socket.c"
oi.includes = "deps/liboi/ deps/libev/"
oi.name = "oi"
oi.target = "oi"
# oi.uselib = "GNUTLS"
oi.install_path = None
### evnet
evnet = bld.new_task_gen("cc", "staticlib")
evnet.source = "deps/evnet/evnet.c"
evnet.includes = "deps/evnet/ deps/libev/"
evnet.name = "evnet"
evnet.target = "evnet"
# evnet.uselib = "GNUTLS"
evnet.install_path = None
if bld.env["USE_DEBUG"]:
oi.clone("debug")
evnet.clone("debug")
### http_parser
http_parser = bld.new_task_gen("cc", "staticlib")
@ -173,10 +173,10 @@ def build(bld):
deps/v8/include
deps/libev
deps/libeio
deps/liboi
deps/evnet
deps/http_parser
"""
node.uselib_local = "oi ev eio http_parser"
node.uselib_local = "evnet ev eio http_parser"
node.uselib = "V8 EXECINFO PROFILER EFENCE"
node.install_path = '${PREFIX}/bin'
node.chmod = 0755

Loading…
Cancel
Save