Browse Source

Update liboi. Use EV_MULTIPLICITY=0.

This might need to be changed in the future if ev is needed in thread pools
or extension libraries. However for now it makes sense to just use a single
loop.
v0.7.4-release
Ryan 16 years ago
parent
commit
90fc8d3622
  1. 3
      deps/libev/wscript
  2. 9
      deps/liboi/config.mk
  3. 2
      deps/liboi/oi.h
  4. 486
      deps/liboi/oi_async.c
  5. 218
      deps/liboi/oi_async.h
  6. 25
      deps/liboi/oi_error.h
  7. 391
      deps/liboi/oi_file.c
  8. 58
      deps/liboi/oi_file.h
  9. 93
      deps/liboi/oi_socket.c
  10. 10
      deps/liboi/oi_socket.h
  11. 7
      deps/liboi/test/connection_interruption.c
  12. 5
      deps/liboi/test/echo.c
  13. 223
      deps/liboi/test/fancy_copy.c
  14. 9
      deps/liboi/test/ping_pong.c
  15. 54
      deps/liboi/test/sleeping_tasks.c
  16. 64
      deps/liboi/test/stdout.c
  17. 2
      src/http.cc
  18. 4
      src/net.cc
  19. 8
      src/node.cc
  20. 1
      src/node.h
  21. 4
      src/timers.cc

3
deps/libev/wscript

@ -46,6 +46,9 @@ def configure(conf):
conf.define("HAVE_CONFIG_H", 1)
conf.write_config_header('config.h')
conf.env.append_value('CCFLAGS', ['-DEV_MULTIPLICITY=0'])
conf.env.append_value('CXXFLAGS', ['-DEV_MULTIPLICITY=0'])
def build(bld):
libev = bld.new_task_gen("cc", "staticlib")
libev.source = 'ev.c'

9
deps/liboi/config.mk

@ -5,15 +5,6 @@ EVDIR=$(HOME)/local/libev
# Define GNUTLSDIR=/foo/bar if your gnutls header and library files are in
# /foo/bar/include and /foo/bar/lib directories.
#GNUTLSDIR=/usr
#
#
# Define NO_PREAD if you have a problem with pread() system call (e.g.
# cygwin.dll before v1.5.22).
#
#
# Define NO_SENDFILE if you have a problem with the sendfile() system call
#
#
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')

2
deps/liboi/oi.h

@ -2,7 +2,5 @@
#define oi_h
#include <oi_socket.h>
#include <oi_async.h>
#include <oi_file.h>
#endif

486
deps/liboi/oi_async.c

@ -1,486 +0,0 @@
#include <stdlib.h> /* malloc() */
#include <stdio.h> /* perror() */
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h> /* read(), write() */
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <pthread.h>
#if HAVE_SENDFILE
# if __linux
# include <sys/sendfile.h>
# elif __freebsd
# include <sys/socket.h>
# include <sys/uio.h>
# elif __hpux
# include <sys/socket.h>
# elif __solaris /* not yet */
# include <sys/sendfile.h>
# else
# error sendfile support requested but not available
# endif
#endif
#include <ev.h>
#include <oi.h>
#define NWORKERS 4
/* TODO make adjustable
* once it is fix sleeping_tasks
*/
static int active_watchers = 0;
static int active_workers = 0;
static int readiness_pipe[2] = {-1, -1};
static oi_queue waiting_tasks;
static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t finished_lock = PTHREAD_MUTEX_INITIALIZER;
struct worker {
oi_task *task;
pthread_t thread;
pthread_attr_t thread_attr;
};
/* Sendfile and pread emulation come from Marc Lehmann's libeio and are
* Copyright (C)2007,2008 Marc Alexander Lehmann.
* Many ideas of oi_async.* are taken from libeio and in fact, I plan to
* use libeio once it becomes usable for me. (The problem is issuing tasks
* from multiple threads.)
*/
#if !HAVE_PREADWRITE
/*
* make our pread/pwrite emulation safe against themselves, but not against
* normal read/write by using a mutex. slows down execution a lot,
* but that's your problem, not mine.
*/
static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
#endif
#if !HAVE_PREADWRITE
# undef pread
# undef pwrite
# define pread eio__pread
# define pwrite eio__pwrite
static ssize_t
eio__pread (int fd, void *buf, size_t count, off_t offset)
{
ssize_t res;
off_t ooffset;
pthread_mutex_lock(&preadwritelock);
ooffset = lseek (fd, 0, SEEK_CUR);
lseek (fd, offset, SEEK_SET);
res = read (fd, buf, count);
lseek (fd, ooffset, SEEK_SET);
pthread_mutex_unlock(&preadwritelock);
return res;
}
static ssize_t
eio__pwrite (int fd, void *buf, size_t count, off_t offset)
{
ssize_t res;
off_t ooffset;
pthread_mutex_lock(&preadwritelock);
ooffset = lseek (fd, 0, SEEK_CUR);
lseek (fd, offset, SEEK_SET);
res = write (fd, buf, count);
lseek (fd, offset, SEEK_SET);
pthread_mutex_unlock(&preadwritelock);
return res;
}
#endif
/* sendfile always needs emulation */
static ssize_t
eio__sendfile (int ofd, int ifd, off_t offset, size_t count)
{
ssize_t res;
if (!count)
return 0;
#if HAVE_SENDFILE
# if __linux
res = sendfile (ofd, ifd, &offset, count);
# elif __freebsd
/*
* Of course, the freebsd sendfile is a dire hack with no thoughts
* wasted on making it similar to other I/O functions.
*/
{
off_t sbytes;
res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
if (res < 0 && sbytes)
/* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
res = sbytes;
}
# elif __hpux
res = sendfile (ofd, ifd, offset, count, 0, 0);
# elif __solaris
{
struct sendfilevec vec;
size_t sbytes;
vec.sfv_fd = ifd;
vec.sfv_flag = 0;
vec.sfv_off = offset;
vec.sfv_len = count;
res = sendfilev (ofd, &vec, 1, &sbytes);
if (res < 0 && sbytes)
res = sbytes;
}
# endif
#else
res = -1;
errno = ENOSYS;
#endif
if (res < 0
&& (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
#if __solaris
|| errno == EAFNOSUPPORT || errno == EPROTOTYPE
#endif
)
)
{
/* emulate sendfile. this is a major pain in the ass */
/* buffer size for various temporary buffers */
#define EIO_BUFSIZE 65536
char *eio_buf = malloc (EIO_BUFSIZE);
errno = ENOMEM;
if (!eio_buf)
return -1;
res = 0;
while (count) {
ssize_t cnt;
cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
if (cnt <= 0) {
if (cnt && !res) res = -1;
break;
}
cnt = write (ofd, eio_buf, cnt);
if (cnt <= 0) {
if (cnt && !res) res = -1;
break;
}
offset += cnt;
res += cnt;
count -= cnt;
}
free(eio_buf);
}
return res;
}
static oi_task*
queue_shift(pthread_mutex_t *lock, oi_queue *queue)
{
oi_queue *last = NULL;
pthread_mutex_lock(lock);
if(!oi_queue_empty(queue)) {
last = oi_queue_last(queue);
oi_queue_remove(last);
}
pthread_mutex_unlock(lock);
if(last == NULL)
return NULL;
return oi_queue_data(last, oi_task, queue);
}
#define P1(name,a) { \
t->params.name.result = name( t->params.name.a ); \
break; \
}
#define P2(name,a,b) { \
t->params.name.result = name( t->params.name.a \
, t->params.name.b \
); \
break; \
}
#define P3(name,a,b,c) { \
t->params.name.result = name( t->params.name.a \
, t->params.name.b \
, t->params.name.c \
); \
break; \
}
#define P4(name,a,b,c,d) { \
t->params.name.result = name( t->params.name.a \
, t->params.name.b \
, t->params.name.c \
, t->params.name.d \
); \
break; \
}
static void
execute_task(oi_task *t)
{
errno = 0;
switch(t->type) {
case OI_TASK_OPEN: P3(open, pathname, flags, mode);
case OI_TASK_READ: P3(read, fd, buf, count);
case OI_TASK_WRITE: P3(write, fd, buf, count);
case OI_TASK_CLOSE: P1(close, fd);
case OI_TASK_SLEEP: P1(sleep, seconds);
case OI_TASK_SENDFILE: P4(eio__sendfile, out_fd, in_fd, offset, count);
case OI_TASK_GETADDRINFO: P4(getaddrinfo, nodename, servname, hints, res);
case OI_TASK_LSTAT: P2(lstat, path, buf);
default:
assert(0 && "unknown task type");
break;
}
t->errorno = errno;
}
static void
attempt_to_get_a_task(struct worker *worker)
{
char dummy;
assert(readiness_pipe[0] > 0);
int r = read(readiness_pipe[0], &dummy, 1);
if(r == -1 && (errno != EAGAIN || errno != EINTR)) {
perror("read(readiness_pipe[0])");
return;
}
// 1 pop task from queue
assert(worker->task == NULL);
oi_task *task = queue_shift(&queue_lock, &waiting_tasks);
if(task == NULL) return;
worker->task = task;
// 2 run task
execute_task(task);
// 3 notify complition
oi_async *async = task->async;
assert(async != NULL);
pthread_mutex_lock(&finished_lock);
oi_queue_insert_head(&async->finished_tasks, &task->queue);
pthread_mutex_unlock(&finished_lock);
ev_async_send(async->loop, &async->watcher);
worker->task = NULL;
/* attempt to pull another task */
return attempt_to_get_a_task(worker);
}
void *
worker_loop(void *data)
{
int r;
struct worker *worker = data;
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(readiness_pipe[0], &readfds);
active_workers++;
assert(active_workers <= NWORKERS);
while(1) {
r = select(1+readiness_pipe[0], &readfds, 0, 0, 0);
if(r == -1) break;
attempt_to_get_a_task(worker);
}
active_workers--;
return NULL;
}
static struct worker*
worker_new()
{
int r;
struct worker *worker = calloc(sizeof(struct worker), 1);
if(worker == NULL ) { return NULL; }
worker->task = NULL;
pthread_attr_setdetachstate(&worker->thread_attr, PTHREAD_CREATE_DETACHED);
r = pthread_create( &worker->thread
, NULL // &worker->thread_attr
, worker_loop
, worker
);
if(r != 0) {
/* TODO: error checking */
perror("pthread_create");
goto error;
}
return worker;
error:
free(worker);
return NULL;
}
static void
start_workers()
{
assert(readiness_pipe[0] == -1);
assert(readiness_pipe[1] == -1);
assert(active_workers == 0);
int r = pipe(readiness_pipe);
if(r < 0) {
perror("pipe()");
assert(0 && "TODO HANDLE ME");
}
/* set the write end non-blocking */
int flags = fcntl(readiness_pipe[1], F_GETFL, 0);
r = fcntl(readiness_pipe[1], F_SETFL, flags | O_NONBLOCK);
if(r < 0) {
assert(0 && "error setting pipe to non-blocking?");
/* TODO error report */
}
oi_queue_init(&waiting_tasks);
int i;
for(i = 0; i < NWORKERS; i++)
worker_new();
}
/*
static void
stop_workers()
{
assert(0 && "TODO implement me");
}
*/
static void
on_completion(struct ev_loop *loop, ev_async *watcher, int revents)
{
oi_async *async = watcher->data;
oi_task *task;
while((task = queue_shift(&finished_lock, &async->finished_tasks))) {
assert(task->active);
task->active = 0;
errno = task->errorno;
# define done_cb(kind) { \
assert(task->params.kind.cb); \
task->params.kind.cb(task, task->params.kind.result); \
break; \
}
switch(task->type) {
case OI_TASK_OPEN: done_cb(open);
case OI_TASK_READ: done_cb(read);
case OI_TASK_WRITE: done_cb(write);
case OI_TASK_CLOSE: done_cb(close);
case OI_TASK_SLEEP: done_cb(sleep);
case OI_TASK_SENDFILE: done_cb(eio__sendfile);
case OI_TASK_GETADDRINFO: done_cb(getaddrinfo);
case OI_TASK_LSTAT: done_cb(lstat);
}
/* the task is possibly freed by callback. do not access it again. */
}
}
void
oi_async_init (oi_async *async)
{
ev_async_init(&async->watcher, on_completion);
oi_queue_init(&async->finished_tasks);
oi_queue_init(&async->new_tasks);
async->watcher.data = async;
}
static void
dispatch_tasks(oi_async *async)
{
while(!oi_queue_empty(&async->new_tasks)) {
oi_queue *last = oi_queue_last(&async->new_tasks);
oi_queue_remove(last);
oi_task *task = oi_queue_data(last, oi_task, queue);
// 1. add task to task queue.
pthread_mutex_lock(&queue_lock);
oi_queue_insert_head(&waiting_tasks, &task->queue);
pthread_mutex_unlock(&queue_lock);
// 2. write byte to pipe
char dummy;
int written = write(readiness_pipe[1], &dummy, 1);
// 3. TODO make sure byte is written
assert(written == 1);
}
}
void
oi_async_attach (struct ev_loop *loop, oi_async *async)
{
if(active_watchers == 0 && active_workers == 0)
start_workers();
active_watchers++;
ev_async_start(loop, &async->watcher);
async->loop = loop;
dispatch_tasks(async);
}
void
oi_async_detach (oi_async *async)
{
if(async->loop == NULL)
return;
ev_async_stop(async->loop, &async->watcher);
async->loop = NULL;
active_watchers--;
if(active_watchers == 0) {
//stop_workers();
}
}
void
oi_async_submit (oi_async *async, oi_task *task)
{
assert(!task->active);
assert(task->async == NULL);
task->async = async;
task->active = 1;
oi_queue_insert_head(&async->new_tasks, &task->queue);
if(ev_is_active(&async->watcher)) {
dispatch_tasks(async);
}
}

218
deps/liboi/oi_async.h

@ -1,218 +0,0 @@
#include <ev.h>
#include <pthread.h>
#include <netdb.h>
#include <oi.h>
#ifndef oi_async_h
#define oi_async_h
#ifdef __cplusplus
extern "C" {
#endif
typedef struct oi_async oi_async;
typedef struct oi_task oi_task;
struct oi_async {
/* private */
ev_async watcher;
struct ev_loop *loop;
oi_queue finished_tasks;
oi_queue new_tasks;
/* public */
void *data;
};
typedef void (*oi_task_int_cb)(oi_task *, int result);
typedef void (*oi_task_uint_cb)(oi_task *, unsigned int result);
typedef void (*oi_task_ssize_cb)(oi_task *, ssize_t result);
struct oi_task {
/* private */
oi_async *async;
oi_queue queue;
int type;
union {
struct {
const char *pathname;
int flags;
mode_t mode;
oi_task_int_cb cb;
int result;
} open;
struct {
int fd;
void *buf;
size_t count;
oi_task_ssize_cb cb;
ssize_t result;
} read;
struct {
int fd;
const void *buf;
size_t count;
oi_task_ssize_cb cb;
ssize_t result;
} write;
struct {
int fd;
oi_task_int_cb cb;
int result;
} close;
struct {
unsigned int seconds;
oi_task_uint_cb cb;
unsigned int result;
} sleep;
struct {
int out_fd;
int in_fd;
off_t offset;
size_t count;
oi_task_ssize_cb cb;
ssize_t result;
} eio__sendfile;
struct {
const char *nodename; /* restrict ? */
const char *servname; /* restrict ? */
struct addrinfo *hints;
struct addrinfo **res; /* restrict ? */
oi_task_int_cb cb;
int result;
} getaddrinfo;
struct {
const char *path;
struct stat *buf;
oi_task_int_cb cb;
int result;
} lstat;
} params;
/* read-only */
volatile unsigned active:1;
int errorno;
/* public */
void *data;
};
void oi_async_init (oi_async *);
void oi_async_attach (struct ev_loop *loop, oi_async *);
void oi_async_detach (oi_async *);
void oi_async_submit (oi_async *, oi_task *);
/* To submit a task for async processing
* (0) allocate memory for your task
* (1) initialize the task with one of the functions below
* (2) optionally set the task->data pointer
* (3) oi_async_submit() the task
*/
enum { OI_TASK_OPEN
, OI_TASK_READ
, OI_TASK_WRITE
, OI_TASK_CLOSE
, OI_TASK_SLEEP
, OI_TASK_SENDFILE
, OI_TASK_GETADDRINFO
, OI_TASK_LSTAT
};
#define oi_task_init_common(task, _type) do {\
(task)->active = 0;\
(task)->async = NULL;\
(task)->type = _type;\
} while(0)
static inline void
oi_task_init_open(oi_task *t, oi_task_int_cb cb, const char *pathname, int flags, mode_t mode)
{
oi_task_init_common(t, OI_TASK_OPEN);
t->params.open.cb = cb;
t->params.open.pathname = pathname;
t->params.open.flags = flags;
t->params.open.mode = mode;
}
static inline void
oi_task_init_read(oi_task *t, oi_task_ssize_cb cb, int fd, void *buf, size_t count)
{
oi_task_init_common(t, OI_TASK_READ);
t->params.read.cb = cb;
t->params.read.fd = fd;
t->params.read.buf = buf;
t->params.read.count = count;
}
static inline void
oi_task_init_write(oi_task *t, oi_task_ssize_cb cb, int fd, const void *buf, size_t count)
{
oi_task_init_common(t, OI_TASK_WRITE);
t->params.write.cb = cb;
t->params.write.fd = fd;
t->params.write.buf = buf;
t->params.write.count = count;
}
static inline void
oi_task_init_close(oi_task *t, oi_task_int_cb cb, int fd)
{
oi_task_init_common(t, OI_TASK_CLOSE);
t->params.close.cb = cb;
t->params.close.fd = fd;
}
static inline void
oi_task_init_sleep(oi_task *t, oi_task_uint_cb cb, unsigned int seconds)
{
oi_task_init_common(t, OI_TASK_SLEEP);
t->params.sleep.cb = cb;
t->params.sleep.seconds = seconds;
}
static inline void
oi_task_init_sendfile(oi_task *t, oi_task_ssize_cb cb, int out_fd, int in_fd, off_t offset, size_t count)
{
oi_task_init_common(t, OI_TASK_SENDFILE);
t->params.eio__sendfile.cb = cb;
t->params.eio__sendfile.out_fd = out_fd;
t->params.eio__sendfile.in_fd = in_fd;
t->params.eio__sendfile.offset = offset;
t->params.eio__sendfile.count = count;
}
static inline void
oi_task_init_getaddrinfo(oi_task *t, oi_task_int_cb cb, const char *node,
const char *service, struct addrinfo *hints, struct addrinfo **res)
{
oi_task_init_common(t, OI_TASK_GETADDRINFO);
t->params.getaddrinfo.cb = cb;
t->params.getaddrinfo.nodename = node;
t->params.getaddrinfo.servname = service;
t->params.getaddrinfo.hints = hints;
t->params.getaddrinfo.res = res;
}
static inline void
oi_task_init_lstat(oi_task *t, oi_task_int_cb cb, const char *path, struct stat *buf)
{
oi_task_init_common(t, OI_TASK_LSTAT);
t->params.lstat.cb = cb;
t->params.lstat.path = path;
t->params.lstat.buf = buf;
}
#ifdef __cplusplus
}
#endif
#endif /* oi_async_h */

25
deps/liboi/oi_error.h

@ -4,18 +4,21 @@
extern "C" {
#endif
enum oi_error_domain
{ OI_ERROR_GNUTLS
, OI_ERROR_EV
, OI_ERROR_CLOSE
, OI_ERROR_SHUTDOWN
, OI_ERROR_OPEN
, OI_ERROR_SEND
, OI_ERROR_RECV
, OI_ERROR_WRITE
, OI_ERROR_READ
, OI_ERROR_SENDFILE
};
struct oi_error {
enum { OI_ERROR_GNUTLS
, OI_ERROR_EV
, OI_ERROR_CLOSE
, OI_ERROR_SHUTDOWN
, OI_ERROR_OPEN
, OI_ERROR_SEND
, OI_ERROR_RECV
, OI_ERROR_WRITE
, OI_ERROR_READ
, OI_ERROR_SENDFILE
} domain;
enum oi_error_domain domain;
int code; /* errno */
};

391
deps/liboi/oi_file.c

@ -1,391 +0,0 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <ev.h>
#include <oi.h>
#define RELEASE_BUF(buf) if(buf->release) { buf->release(buf); }
#define DRAIN_CB(file) if(file->on_drain) { file->on_drain(file); }
#define RAISE_ERROR(s, _domain, _code) do { \
if(s->on_error) { \
struct oi_error __oi_error; \
__oi_error.domain = _domain; \
__oi_error.code = _code; \
s->on_error(s, __oi_error); \
} \
} while(0) \
/* forwards */
static void dispatch_write_buf (oi_file *file);
static void maybe_do_read (oi_file *file);
static void
after_read(oi_task *task, ssize_t recved)
{
oi_file *file = task->data;
if(recved == -1) {
RAISE_ERROR(file, OI_ERROR_READ, errno);
return;
}
if(recved == 0)
oi_file_read_stop(file);
if(file->on_read)
file->on_read(file, recved);
maybe_do_read(file);
}
static void
maybe_do_read(oi_file *file)
{
if ( file->read_buffer == NULL
|| file->write_buf != NULL
|| file->write_socket != NULL
|| !oi_queue_empty(&file->write_queue)
|| file->io_task.active
) return;
assert(file->fd > 0);
oi_task_init_read ( &file->io_task
, after_read
, file->fd
, file->read_buffer
, file->read_buffer_size
);
file->io_task.data = file;
oi_async_submit(&file->async, &file->io_task);
}
static void
submit_read (oi_file *file)
{
}
int
oi_file_init (oi_file *file)
{
oi_async_init(&file->async);
file->async.data = file;
oi_queue_init(&file->write_queue);
file->fd = -1;
file->loop = NULL;
file->write_buf = NULL;
file->read_buffer = NULL;
file->on_open = NULL;
file->on_read = NULL;
file->on_drain = NULL;
file->on_error = NULL;
file->on_close = NULL;
return 0;
}
void
oi_file_read_start (oi_file *file, void *buffer, size_t bufsize)
{
file->read_buffer = buffer;
file->read_buffer_size = bufsize;
maybe_do_read(file);
}
void
oi_file_read_stop (oi_file *file)
{
file->read_buffer = NULL;
}
void
oi_api_free_buf_with_heap_base(oi_buf *buf)
{
free(buf->base);
free(buf);
}
static void
after_open(oi_task *task, int result)
{
oi_file *file = task->data;
if(result == -1) {
RAISE_ERROR(file, OI_ERROR_OPEN, errno);
return;
}
file->fd = result;
if(file->on_open) {
file->on_open(file);
}
maybe_do_read(file);
}
int
oi_file_open_path (oi_file *file, const char *path, int flags, mode_t mode)
{
if(file->fd >= 0)
return -1;
oi_task_init_open( &file->io_task
, after_open
, path
, flags
, mode
);
file->io_task.data = file;
oi_async_submit(&file->async, &file->io_task);
return 0;
}
int
oi_file_open_stdin (oi_file *file)
{
if(file->fd >= 0)
return -1;
file->fd = STDIN_FILENO;
if(file->on_open)
file->on_open(file);
return 0;
}
int
oi_file_open_stdout (oi_file *file)
{
if(file->fd >= 0)
return -1;
file->fd = STDOUT_FILENO;
if(file->on_open)
file->on_open(file);
return 0;
}
int
oi_file_open_stderr (oi_file *file)
{
if(file->fd >= 0)
return -1;
file->fd = STDERR_FILENO;
if(file->on_open)
file->on_open(file);
return 0;
}
void
oi_file_attach (oi_file *file, struct ev_loop *loop)
{
oi_async_attach (loop, &file->async);
file->loop = loop;
}
void
oi_file_detach (oi_file *file)
{
oi_async_detach (&file->async);
file->loop = NULL;
}
static void
after_write(oi_task *task, ssize_t result)
{
oi_file *file = task->data;
if(result == -1) {
RAISE_ERROR(file, OI_ERROR_WRITE, errno);
return;
}
assert(file->write_buf != NULL);
oi_buf *buf = file->write_buf;
buf->written += result;
if(buf->written < buf->len) {
oi_task_init_write ( &file->io_task
, after_write
, file->fd
, buf->base + buf->written
, buf->len - buf->written
);
file->io_task.data = file;
oi_async_submit(&file->async, &file->io_task);
return;
}
assert(buf->written == buf->len);
RELEASE_BUF(file->write_buf);
file->write_buf = NULL;
if(oi_queue_empty(&file->write_queue)) {
DRAIN_CB(file);
maybe_do_read(file);
} else {
dispatch_write_buf(file);
}
return;
}
static void
dispatch_write_buf(oi_file *file)
{
if(file->write_buf != NULL)
return;
if(oi_queue_empty(&file->write_queue))
return;
oi_queue *q = oi_queue_last(&file->write_queue);
oi_queue_remove(q);
oi_buf *buf = file->write_buf = oi_queue_data(q, oi_buf, queue);
assert(!file->io_task.active);
oi_task_init_write ( &file->io_task
, after_write
, file->fd
, buf->base + buf->written
, buf->len - buf->written
);
file->io_task.data = file;
oi_async_submit(&file->async, &file->io_task);
}
int
oi_file_write (oi_file *file, oi_buf *buf)
{
if(file->fd < 0)
return -1;
if(file->read_buffer)
return -2;
/* TODO better business check*/
buf->written = 0;
oi_queue_insert_head(&file->write_queue, &buf->queue);
dispatch_write_buf(file);
return 0;
}
// Writes a string to the file.
// NOTE: Allocates memory. Avoid for performance applications.
int
oi_file_write_simple (oi_file *file, const char *str, size_t len)
{
if(file->fd < 0)
return -1;
if(file->read_buffer)
return -2;
/* TODO better business check*/
oi_buf *buf = malloc(sizeof(oi_buf));
buf->base = malloc(len);
memcpy(buf->base, str, len);
buf->len = len;
buf->release = oi_api_free_buf_with_heap_base;
oi_file_write(file, buf);
return 0;
}
static void
clear_write_queue(oi_file *file)
{
while(!oi_queue_empty(&file->write_queue)) {
oi_queue *q = oi_queue_last(&file->write_queue);
oi_queue_remove(q);
oi_buf *buf = oi_queue_data(q, oi_buf, queue);
RELEASE_BUF(buf);
}
}
static void
after_close(oi_task *task, int result)
{
oi_file *file = task->data;
assert(oi_queue_empty(&file->write_queue));
if(result == -1) {
RAISE_ERROR(file, OI_ERROR_CLOSE, errno);
return;
// TODO try to close again?
}
file->fd = -1;
// TODO deinit task_queue, detach thread_pool_result_watcher
if(file->on_close) {
file->on_close(file);
}
return;
}
void
oi_file_close (oi_file *file)
{
assert(file->fd >= 0 && "file not open!");
clear_write_queue(file);
oi_task_init_close ( &file->io_task
, after_close
, file->fd
);
file->io_task.data = file;
oi_async_submit(&file->async, &file->io_task);
}
static void
after_sendfile(oi_task *task, ssize_t sent)
{
oi_file *file = task->data;
oi_socket *socket = file->write_socket;
assert(socket != NULL);
file->write_socket = NULL;
if(sent == -1) {
RAISE_ERROR(file, OI_ERROR_SENDFILE, errno);
return;
}
if(socket->on_drain) {
socket->on_drain(socket);
}
maybe_do_read(file);
}
int
oi_file_send (oi_file *file, oi_socket *destination, off_t offset, size_t count)
{
if(file->fd < 0)
return -1;
if(file->read_buffer)
return -2;
/* TODO better business check*/
assert(file->write_socket == NULL);
// (1) make sure the write queue on the socket is cleared.
//
// (2)
//
file->write_socket = destination;
oi_task_init_sendfile ( &file->io_task
, after_sendfile
, destination->fd
, file->fd
, offset
, count
);
file->io_task.data = file;
oi_async_submit(&file->async, &file->io_task);
return 0;
}

58
deps/liboi/oi_file.h

@ -1,58 +0,0 @@
#include <oi.h>
#include <ev.h>
#ifdef __cplusplus
extern "C" {
#endif
#ifndef oi_file_h
#define oi_file_h
typedef struct oi_file oi_file;
int oi_file_init (oi_file *);
void oi_file_attach (oi_file *, struct ev_loop *);
void oi_file_detach (oi_file *);
/* WARNING oi_file_open_path: path argument must be valid until oi_file
* object is closed and the on_close() callback is made. oi does not strdup
* the path pointer. */
int oi_file_open_path (oi_file *, const char *path, int flags, mode_t mode);
int oi_file_open_stdin (oi_file *);
int oi_file_open_stdout (oi_file *);
int oi_file_open_stderr (oi_file *);
void oi_file_read_start (oi_file *, void *buffer, size_t bufsize);
void oi_file_read_stop (oi_file *);
int oi_file_write (oi_file *, oi_buf *to_write);
int oi_file_write_simple (oi_file *, const char *, size_t);
int oi_file_send (oi_file *source, oi_socket *destination, off_t offset, size_t length);
void oi_file_close (oi_file *);
struct oi_file {
/* private */
oi_async async;
oi_task io_task;
struct ev_loop *loop;
oi_queue write_queue;
oi_buf *write_buf; /* TODO this pointer is unnecessary - remove and just look at first element of the queue */
oi_socket *write_socket;
void *read_buffer;
size_t read_buffer_size;
/* read-only */
int fd;
/* public */
void (*on_open) (oi_file *);
void (*on_read) (oi_file *, size_t count);
void (*on_drain) (oi_file *);
void (*on_error) (oi_file *, struct oi_error);
void (*on_close) (oi_file *);
void *data;
};
#ifdef __cplusplus
}
#endif
#endif /* oi_file_h */

93
deps/liboi/oi_socket.c

@ -11,6 +11,14 @@
#include <ev.h>
#include <oi_socket.h>
#if EV_MULTIPLICITY
# define SOCKET_LOOP_ socket->loop,
# define SERVER_LOOP_ server->loop,
#else
# define SOCKET_LOOP_
# define SERVER_LOOP_
#endif // EV_MULTIPLICITY
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1)
@ -49,8 +57,8 @@ full_close(oi_socket *socket)
socket->read_action = NULL;
socket->write_action = NULL;
if(socket->loop) {
ev_feed_event(socket->loop, &socket->read_watcher, EV_READ);
if(socket->attached) {
ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ);
}
return OKAY;
}
@ -88,7 +96,7 @@ update_write_buffer_after_send(oi_socket *socket, ssize_t sent)
}
if(oi_queue_empty(&socket->out_stream)) {
ev_io_stop(socket->loop, &socket->write_watcher);
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
if(socket->on_drain)
socket->on_drain(socket);
}
@ -160,7 +168,7 @@ secure_socket_send(oi_socket *socket)
ssize_t sent;
if(oi_queue_empty(&socket->out_stream)) {
ev_io_stop(socket->loop, &socket->write_watcher);
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
@ -337,7 +345,7 @@ socket_send(oi_socket *socket)
assert(socket->secure == FALSE);
if(oi_queue_empty(&socket->out_stream)) {
ev_io_stop(socket->loop, &socket->write_watcher);
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
@ -477,14 +485,16 @@ assign_file_descriptor(oi_socket *socket, int fd)
* Called by server->connection_watcher.
*/
static void
on_connection(struct ev_loop *loop, ev_io *watcher, int revents)
on_connection(EV_P_ ev_io *watcher, int revents)
{
oi_server *server = watcher->data;
// printf("on connection!\n");
assert(server->listening);
#if EV_MULTIPLICITY
assert(server->loop == loop);
#endif
assert(&server->connection_watcher == watcher);
if(EV_ERROR & revents) {
@ -524,7 +534,7 @@ on_connection(struct ev_loop *loop, ev_io *watcher, int revents)
socket->server = server;
assign_file_descriptor(socket, fd);
oi_socket_attach(socket, loop);
oi_socket_attach(EV_A_ socket);
}
int
@ -595,23 +605,30 @@ oi_server_close(oi_server *server)
}
void
oi_server_attach (oi_server *server, struct ev_loop *loop)
oi_server_attach (EV_P_ oi_server *server)
{
ev_io_start (loop, &server->connection_watcher);
ev_io_start (EV_A_ &server->connection_watcher);
#if EV_MULTIPLICITY
server->loop = loop;
#endif
server->attached = TRUE;
}
void
oi_server_detach (oi_server *server)
{
ev_io_stop (server->loop, &server->connection_watcher);
ev_io_stop (SERVER_LOOP_ &server->connection_watcher);
#if EV_MULTIPLICITY
server->loop = NULL;
#endif
server->attached = FALSE;
}
void
oi_server_init(oi_server *server, int backlog)
{
server->backlog = backlog;
server->attached = FALSE;
server->listening = FALSE;
server->fd = -1;
server->connection_watcher.data = server;
@ -624,7 +641,7 @@ oi_server_init(oi_server *server, int backlog)
/* Internal callback. called by socket->timeout_watcher */
static void
on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents)
on_timeout(EV_P_ ev_timer *watcher, int revents)
{
oi_socket *socket = watcher->data;
@ -652,7 +669,7 @@ release_write_buffer(oi_socket *socket)
/* Internal callback. called by socket->read_watcher */
static void
on_io_event(struct ev_loop *loop, ev_io *watcher, int revents)
on_io_event(EV_P_ ev_io *watcher, int revents)
{
oi_socket *socket = watcher->data;
@ -698,9 +715,9 @@ on_io_event(struct ev_loop *loop, ev_io *watcher, int revents)
close:
release_write_buffer(socket);
ev_clear_pending (socket->loop, &socket->write_watcher);
ev_clear_pending (socket->loop, &socket->read_watcher);
ev_clear_pending (socket->loop, &socket->timeout_watcher);
ev_clear_pending (EV_A_ &socket->write_watcher);
ev_clear_pending (EV_A_ &socket->read_watcher);
ev_clear_pending (EV_A_ &socket->timeout_watcher);
oi_socket_detach(socket);
@ -721,7 +738,10 @@ oi_socket_init(oi_socket *socket, float timeout)
{
socket->fd = -1;
socket->server = NULL;
#if EV_MULTIPLICITY
socket->loop = NULL;
#endif
socket->attached = FALSE;
socket->connected = FALSE;
oi_queue_init(&socket->out_stream);
@ -765,8 +785,8 @@ oi_socket_write_eof (oi_socket *socket)
)
{
socket->write_action = secure_half_goodbye;
if(socket->loop)
ev_io_start(socket->loop, &socket->write_watcher);
if(socket->attached)
ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
return;
}
/* secure servers cannot handle half-closed connections? */
@ -799,8 +819,8 @@ oi_socket_close (oi_socket *socket)
socket->read_action = NULL;
}
if(socket->loop)
ev_io_start(socket->loop, &socket->write_watcher);
if(socket->attached)
ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
return;
}
@ -815,7 +835,7 @@ oi_socket_close (oi_socket *socket)
void
oi_socket_reset_timeout(oi_socket *socket)
{
ev_timer_again(socket->loop, &socket->timeout_watcher);
ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
}
/**
@ -831,7 +851,8 @@ oi_socket_write(oi_socket *socket, oi_buf *buf)
oi_queue_insert_head(&socket->out_stream, &buf->queue);
buf->written = 0;
ev_io_start(socket->loop, &socket->write_watcher);
// XXX if (socket->attached) ??
ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
}
static void
@ -856,45 +877,51 @@ oi_socket_write_simple(oi_socket *socket, const char *str, size_t len)
}
void
oi_socket_attach(oi_socket *socket, struct ev_loop *loop)
oi_socket_attach(EV_P_ oi_socket *socket)
{
#if EV_MULTIPLICITY
socket->loop = loop;
#endif
socket->attached = TRUE;
ev_timer_again(loop, &socket->timeout_watcher);
ev_timer_again(EV_A_ &socket->timeout_watcher);
if(socket->read_action)
ev_io_start(loop, &socket->read_watcher);
ev_io_start(EV_A_ &socket->read_watcher);
if(socket->write_action)
ev_io_start(loop, &socket->write_watcher);
ev_io_start(EV_A_ &socket->write_watcher);
/* make sure the io_event happens soon in the case we're being reattached */
ev_feed_event(loop, &socket->read_watcher, EV_READ);
ev_feed_event(EV_A_ &socket->read_watcher, EV_READ);
}
void
oi_socket_detach(oi_socket *socket)
{
if(socket->loop) {
ev_io_stop(socket->loop, &socket->write_watcher);
ev_io_stop(socket->loop, &socket->read_watcher);
ev_timer_stop(socket->loop, &socket->timeout_watcher);
if(socket->attached) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher);
#if EV_MULTIPLICITY
socket->loop = NULL;
#endif
socket->attached = FALSE;
}
}
void
oi_socket_read_stop (oi_socket *socket)
{
ev_io_stop(socket->loop, &socket->read_watcher);
ev_clear_pending (socket->loop, &socket->read_watcher);
ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
ev_clear_pending (SOCKET_LOOP_ &socket->read_watcher);
}
void
oi_socket_read_start (oi_socket *socket)
{
if(socket->read_action) {
ev_io_start(socket->loop, &socket->read_watcher);
ev_io_start(SOCKET_LOOP_ &socket->read_watcher);
/* XXX feed event? */
}
}

10
deps/liboi/oi_socket.h

@ -22,14 +22,14 @@ typedef struct oi_socket oi_socket;
void oi_server_init (oi_server *, int backlog);
int oi_server_listen (oi_server *, struct addrinfo *addrinfo);
void oi_server_attach (oi_server *, struct ev_loop *loop);
void oi_server_attach (EV_P_ oi_server *);
void oi_server_detach (oi_server *);
void oi_server_close (oi_server *);
void oi_socket_init (oi_socket *, float timeout);
int oi_socket_pair (oi_socket *a, oi_socket *b); /* TODO */
int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo);
void oi_socket_attach (oi_socket *, struct ev_loop *loop);
void oi_socket_attach (EV_P_ oi_socket *);
void oi_socket_detach (oi_socket *);
void oi_socket_read_start (oi_socket *);
void oi_socket_read_stop (oi_socket *);
@ -46,7 +46,10 @@ struct oi_server {
/* read only */
int fd;
int backlog;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
unsigned attached:1;
unsigned listening:1;
/* private */
@ -61,10 +64,13 @@ struct oi_server {
struct oi_socket {
/* read only */
int fd;
#if EV_MULTIPLICITY
struct ev_loop *loop;
#endif
oi_server *server;
oi_queue out_stream;
size_t written;
unsigned attached:1;
unsigned connected:1;
unsigned secure:1;
unsigned wait_for_secure_hangup:1;

7
deps/liboi/test/connection_interruption.c

@ -86,7 +86,6 @@ int
main(int argc, const char *argv[])
{
int r;
struct ev_loop *loop = ev_default_loop(0);
oi_server_init(&server, 1000);
server.on_connection = on_server_connection;
@ -126,7 +125,7 @@ main(int argc, const char *argv[])
#endif
oi_server_listen(&server, servinfo);
oi_server_attach(&server, loop);
oi_server_attach(EV_DEFAULT_ &server);
int i;
for(i = 0; i < NCONN; i++) {
@ -144,10 +143,10 @@ main(int argc, const char *argv[])
#endif
r = oi_socket_connect(client, servinfo);
assert(r == 0 && "problem connecting");
oi_socket_attach(client, loop);
oi_socket_attach(EV_DEFAULT_ client);
}
ev_loop(loop, 0);
ev_loop(EV_DEFAULT_ 0);
assert(nconnections == NCONN);

5
deps/liboi/test/echo.c

@ -44,7 +44,6 @@ int
main(int argc, const char *argv[])
{
int r;
struct ev_loop *loop = ev_default_loop(0);
oi_server server;
oi_socket client;
@ -90,9 +89,9 @@ main(int argc, const char *argv[])
#endif
r = oi_server_listen(&server, servinfo);
assert(r == 0);
oi_server_attach(&server, loop);
oi_server_attach(EV_DEFAULT_ &server);
ev_loop(loop, 0);
ev_loop(EV_DEFAULT_ 0);
#if TCP
freeaddrinfo(servinfo);

223
deps/liboi/test/fancy_copy.c

@ -1,223 +0,0 @@
/* This test uses most of oi's facilities. It starts by opening a new file
* and writing N characters to it. Once the first chunk is written, it opens
* that file with another handle
*
* /tmp/oi_test_src == /tmp/oi_test_dst
* | ^
* | |
* stream | | write
* | |
* V |
* client ----------> server/connection
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdint.h>
#ifndef TRUE
# define TRUE 1
#endif
#ifndef FALSE
# define FALSE 0
#endif
#include <oi.h>
#include <oi_file.h>
#define PORT "5555"
static struct ev_loop *loop;
static oi_file file_src;
static oi_file file_dst;
static oi_socket client;
static oi_server server;
static oi_socket connection;
static int got_connection = 0;
static char *src_filename;
static char *dst_filename;
static void
file_error(oi_file *_)
{
assert(0);
}
static void
on_file_dst_open (oi_file *_)
{
assert(connection.fd > 0);
oi_socket_read_start(&connection);
//printf("file_dst is open\n");
}
static void
on_file_dst_close (oi_file *_)
{
//printf("file dst closed\n");
oi_file_detach(&file_dst);
}
static void
on_connection_read (oi_socket *_, const void *buf, size_t count)
{
assert(file_dst.fd > 0);
if(count == 0) {
file_dst.on_drain = oi_file_close;
oi_socket_close(&connection);
} else {
oi_file_write_simple(&file_dst, buf, count);
}
}
static void
on_connection_connect (oi_socket *_)
{
oi_file_init(&file_dst);
file_dst.on_open = on_file_dst_open;
file_dst.on_close = on_file_dst_close;
oi_file_open_path(&file_dst, dst_filename, O_WRONLY | O_CREAT, 0644);
oi_file_attach(&file_dst, loop);
oi_socket_read_stop(&connection);
}
static void
on_connection_timeout (oi_socket *_)
{
assert(0);
}
static void
on_connection_error (oi_socket *_, struct oi_error e)
{
assert(0);
}
static void
on_connection_close (oi_socket *_)
{
oi_server_close(&server);
oi_server_detach(&server);
}
static oi_socket*
on_server_connection(oi_server *_, struct sockaddr *addr, socklen_t len)
{
assert(got_connection == FALSE);
oi_socket_init(&connection, 5.0);
connection.on_connect = on_connection_connect;
connection.on_read = on_connection_read;
connection.on_error = on_connection_error;
connection.on_close = on_connection_close;
connection.on_timeout = on_connection_timeout;
connection.on_drain = oi_socket_close;
got_connection = TRUE;
//printf("on server connection\n");
return &connection;
}
static void
on_client_read (oi_socket *_, const void *buf, size_t count)
{
assert(0);
}
static void
on_client_error (oi_socket *_, struct oi_error e)
{
assert(0);
}
static void
on_client_connect (oi_socket *_)
{
if(file_src.fd > 0) {
oi_file_send(&file_src, &client, 0, 50*1024);
}
}
static void
on_client_drain (oi_socket *_)
{
oi_socket_close(&client);
oi_file_close(&file_src);
}
static void
on_file_src_open (oi_file *_)
{
if(client.fd > 0) {
oi_file_send(&file_src, &client, 0, 50*1024);
}
}
static void
on_client_timeout (oi_socket *_)
{
assert(0);
}
int
main(int argc, char *argv[])
{
int r;
loop = ev_default_loop(0);
assert(argc == 3);
src_filename = argv[1];
dst_filename = argv[2];
assert(strlen(src_filename) > 0);
assert(strlen(dst_filename) > 0);
oi_server_init(&server, 10);
server.on_connection = on_server_connection;
struct addrinfo *servinfo;
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
r = getaddrinfo(NULL, PORT, &hints, &servinfo);
assert(r == 0);
r = oi_server_listen(&server, servinfo);
assert(r >= 0 && "problem listening");
oi_server_attach(&server, loop);
oi_socket_init(&client, 5.0);
client.on_read = on_client_read;
client.on_error = on_client_error;
client.on_connect = on_client_connect;
client.on_timeout = on_client_timeout;
//client.on_close = oi_socket_detach;
client.on_drain = on_client_drain;
r = oi_socket_connect(&client, servinfo);
assert(r == 0 && "problem connecting");
oi_socket_attach(&client, loop);
oi_file_init(&file_src);
file_src.on_open = on_file_src_open;
file_src.on_drain = file_error;
file_src.on_close = oi_file_detach;
oi_file_open_path(&file_src, src_filename, O_RDONLY, 0700);
oi_file_attach(&file_src, loop);
ev_loop(loop, 0);
printf("\noi_file: %d bytes\n", sizeof(oi_file));
printf("oi_socket: %d bytes\n", sizeof(oi_socket));
return 0;
}

9
deps/liboi/test/ping_pong.c

@ -30,7 +30,7 @@ static void
on_client_close(oi_socket *socket)
{
//printf("client connection closed\n");
ev_unloop(socket->loop, EVUNLOOP_ALL);
ev_unloop(EV_DEFAULT_ EVUNLOOP_ALL);
}
static oi_socket*
@ -87,7 +87,6 @@ int
main(int argc, const char *argv[])
{
int r;
struct ev_loop *loop = ev_default_loop(0);
oi_server server;
oi_socket client;
@ -133,7 +132,7 @@ main(int argc, const char *argv[])
#endif
r = oi_server_listen(&server, servinfo);
assert(r == 0);
oi_server_attach(&server, loop);
oi_server_attach(EV_DEFAULT_ &server);
oi_socket_init(&client, 5.0);
client.on_read = on_client_read;
@ -150,9 +149,9 @@ main(int argc, const char *argv[])
r = oi_socket_connect(&client, servinfo);
assert(r == 0 && "problem connecting");
oi_socket_attach(&client, loop);
oi_socket_attach(EV_DEFAULT_ &client);
ev_loop(loop, 0);
ev_loop(EV_DEFAULT_ 0);
assert(successful_ping_count == EXCHANGES + 1);
assert(nconnections == 1);

54
deps/liboi/test/sleeping_tasks.c

@ -1,54 +0,0 @@
#include <unistd.h> /* sleep() */
#include <stdlib.h> /* malloc(), free() */
#include <assert.h>
#include <ev.h>
#include <oi_async.h>
#define SLEEPS 4
static int runs = 0;
static void
done (oi_task *task, unsigned int result)
{
assert(result == 0);
if(++runs == SLEEPS) {
ev_timer *timer = task->data;
ev_timer_stop(task->async->loop, timer);
oi_async_detach(task->async);
}
free(task);
}
static void
on_timeout(struct ev_loop *loop, ev_timer *w, int events)
{
assert(0 && "timeout before all sleeping tasks were complete!");
}
int
main()
{
struct ev_loop *loop = ev_default_loop(0);
oi_async async;
ev_timer timer;
int i;
oi_async_init(&async);
for(i = 0; i < SLEEPS; i++) {
oi_task *task = malloc(sizeof(oi_task));
oi_task_init_sleep(task, done, 1);
task->data = &timer;
oi_async_submit(&async, task);
}
oi_async_attach(loop, &async);
ev_timer_init (&timer, on_timeout, 1.2, 0.);
ev_timer_start (loop, &timer);
ev_loop(loop, 0);
assert(runs == SLEEPS);
return 0;
}

64
deps/liboi/test/stdout.c

@ -1,64 +0,0 @@
#include <unistd.h> /* sleep() */
#include <stdlib.h> /* malloc(), free() */
#include <stdio.h>
#include <errno.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <ev.h>
#include <oi_file.h>
static oi_file file;
static oi_file out;
#define READ_BUFSIZE (5)
static char read_buf[READ_BUFSIZE];
# define SEP "~~~~~~~~~~~~~~~~~~~~~~\n"
static void
on_open(oi_file *f)
{
oi_file_write_simple(&out, "\n", 1);
oi_file_write_simple(&out, SEP, sizeof(SEP));
oi_file_read_start(f, read_buf, READ_BUFSIZE);
}
static void
on_close(oi_file *f)
{
oi_file_write_simple(&out, SEP, sizeof(SEP));
oi_file_detach(f);
out.on_drain = oi_file_detach;
}
static void
on_read(oi_file *f, size_t recved)
{
if(recved == 0) /* EOF */
oi_file_close(f);
else
oi_file_write_simple(&out, read_buf, recved);
}
int
main()
{
struct ev_loop *loop = ev_default_loop(0);
oi_file_init(&file);
file.on_open = on_open;
file.on_read = on_read;
file.on_close = on_close;
oi_file_open_path(&file, "LICENSE", O_RDONLY, 0);
oi_file_attach(&file, loop);
oi_file_init(&out);
oi_file_open_stdout(&out);
oi_file_attach(&out, loop);
ev_loop(loop, 0);
return 0;
}

2
src/http.cc

@ -588,7 +588,7 @@ HttpServer::Start(struct addrinfo *servinfo)
{
int r = oi_server_listen(&server, servinfo);
if(r == 0)
oi_server_attach(&server, node_loop());
oi_server_attach(EV_DEFAULT_UC_ &server);
return r;
}

4
src/net.cc

@ -159,7 +159,7 @@ Server::ListenTCP (const Arguments& args)
r = oi_server_listen(&server->server_, address);
if (r != 0)
return ThrowException(String::New("Error listening on port"));
oi_server_attach(&server->server_, node_loop());
oi_server_attach(EV_DEFAULT_UC_ &server->server_);
freeaddrinfo(address);
@ -349,7 +349,7 @@ Socket::AfterResolve (eio_req *req)
// no error. return.
if(r == 0 && req->result == 0) {
oi_socket_attach (&socket->socket_, node_loop());
oi_socket_attach (EV_DEFAULT_UC_ &socket->socket_);
return 0;
}

8
src/node.cc

@ -129,14 +129,14 @@ void
node_fatal_exception (TryCatch &try_catch)
{
ReportException(&try_catch);
ev_unloop(node_loop(), EVUNLOOP_ALL);
ev_unloop(EV_DEFAULT_UC_ EVUNLOOP_ALL);
exit_code = 1;
}
void node_exit (int code)
{
exit_code = code;
ev_unloop(node_loop(), EVUNLOOP_ALL);
ev_unloop(EV_DEFAULT_UC_ EVUNLOOP_ALL);
}
@ -170,6 +170,8 @@ node_eio_warmup (void)
int
main (int argc, char *argv[])
{
ev_default_loop(EVFLAG_AUTO); // initialize the default ev loop.
// start eio thread pool
ev_async_init(&thread_pool_watcher, thread_pool_cb);
eio_init(thread_pool_want_poll, NULL);
@ -220,7 +222,7 @@ main (int argc, char *argv[])
ExecuteString(String::New(native_main), String::New("main.js"));
if (try_catch.HasCaught()) goto native_js_error;
ev_loop(node_loop(), 0);
ev_loop(EV_DEFAULT_UC_ 0);
context.Dispose();

1
src/node.h

@ -13,7 +13,6 @@
enum encoding {UTF8, RAW};
void node_fatal_exception (v8::TryCatch &try_catch);
#define node_loop() ev_default_loop(0)
void node_exit (int code);
// call this after creating a new eio event.

4
src/timers.cc

@ -68,12 +68,12 @@ Timer::Timer (Handle<Function> callback, ev_tstamp after, ev_tstamp repeat)
ev_timer_init(&watcher_, Timer::OnTimeout, after, repeat);
watcher_.data = this;
ev_timer_start(node_loop(), &watcher_);
ev_timer_start(EV_DEFAULT_UC_ &watcher_);
}
Timer::~Timer ()
{
ev_timer_stop (node_loop(), &watcher_);
ev_timer_stop (EV_DEFAULT_UC_ &watcher_);
handle_->SetInternalField(0, Undefined());
handle_.Dispose();
}

Loading…
Cancel
Save