From 90fc8d36220c0d66c352ee5f72080b8592d310d5 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 28 Apr 2009 22:37:03 +0200 Subject: [PATCH] Update liboi. Use EV_MULTIPLICITY=0. This might need to be changed in the future if ev is needed in thread pools or extension libraries. However for now it makes sense to just use a single loop. --- deps/libev/wscript | 3 + deps/liboi/config.mk | 11 +- deps/liboi/oi.h | 2 - deps/liboi/oi_async.c | 486 ---------------------- deps/liboi/oi_async.h | 218 ---------- deps/liboi/oi_error.h | 25 +- deps/liboi/oi_file.c | 391 ----------------- deps/liboi/oi_file.h | 58 --- deps/liboi/oi_socket.c | 93 +++-- deps/liboi/oi_socket.h | 10 +- deps/liboi/test/connection_interruption.c | 7 +- deps/liboi/test/echo.c | 5 +- deps/liboi/test/fancy_copy.c | 223 ---------- deps/liboi/test/ping_pong.c | 9 +- deps/liboi/test/sleeping_tasks.c | 54 --- deps/liboi/test/stdout.c | 64 --- src/http.cc | 2 +- src/net.cc | 4 +- src/node.cc | 8 +- src/node.h | 1 - src/timers.cc | 4 +- 21 files changed, 105 insertions(+), 1573 deletions(-) delete mode 100644 deps/liboi/oi_async.c delete mode 100644 deps/liboi/oi_async.h delete mode 100644 deps/liboi/oi_file.c delete mode 100644 deps/liboi/oi_file.h delete mode 100644 deps/liboi/test/fancy_copy.c delete mode 100644 deps/liboi/test/sleeping_tasks.c delete mode 100644 deps/liboi/test/stdout.c diff --git a/deps/libev/wscript b/deps/libev/wscript index d3885474f4..b713c0c424 100644 --- a/deps/libev/wscript +++ b/deps/libev/wscript @@ -46,6 +46,9 @@ def configure(conf): conf.define("HAVE_CONFIG_H", 1) conf.write_config_header('config.h') + conf.env.append_value('CCFLAGS', ['-DEV_MULTIPLICITY=0']) + conf.env.append_value('CXXFLAGS', ['-DEV_MULTIPLICITY=0']) + def build(bld): libev = bld.new_task_gen("cc", "staticlib") libev.source = 'ev.c' diff --git a/deps/liboi/config.mk b/deps/liboi/config.mk index 311f8cb535..b5d6fd0b82 100644 --- a/deps/liboi/config.mk +++ b/deps/liboi/config.mk @@ -5,15 +5,6 @@ EVDIR=$(HOME)/local/libev # Define GNUTLSDIR=/foo/bar if your gnutls header and library files are in # /foo/bar/include and /foo/bar/lib directories. #GNUTLSDIR=/usr -# -# -# Define NO_PREAD if you have a problem with pread() system call (e.g. -# cygwin.dll before v1.5.22). -# -# -# Define NO_SENDFILE if you have a problem with the sendfile() system call -# -# uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') @@ -22,7 +13,7 @@ uname_R := $(shell sh -c 'uname -r 2>/dev/null || echo not') uname_P := $(shell sh -c 'uname -p 2>/dev/null || echo not') # CFLAGS and LDFLAGS are for the users to override from the command line. -CFLAGS = -g +CFLAGS = -g LDFLAGS = PREFIX = $(HOME)/local/liboi diff --git a/deps/liboi/oi.h b/deps/liboi/oi.h index c7145ea351..b4d7f0a3ec 100644 --- a/deps/liboi/oi.h +++ b/deps/liboi/oi.h @@ -2,7 +2,5 @@ #define oi_h #include -#include -#include #endif diff --git a/deps/liboi/oi_async.c b/deps/liboi/oi_async.c deleted file mode 100644 index 1d5459d625..0000000000 --- a/deps/liboi/oi_async.c +++ /dev/null @@ -1,486 +0,0 @@ -#include /* malloc() */ -#include /* perror() */ -#include -#include -#include /* read(), write() */ -#include -#include -#include -#include - -#if HAVE_SENDFILE -# if __linux -# include -# elif __freebsd -# include -# include -# elif __hpux -# include -# elif __solaris /* not yet */ -# include -# else -# error sendfile support requested but not available -# endif -#endif - -#include -#include - -#define NWORKERS 4 -/* TODO make adjustable - * once it is fix sleeping_tasks - */ - -static int active_watchers = 0; -static int active_workers = 0; -static int readiness_pipe[2] = {-1, -1}; -static oi_queue waiting_tasks; -static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t finished_lock = PTHREAD_MUTEX_INITIALIZER; - -struct worker { - oi_task *task; - pthread_t thread; - pthread_attr_t thread_attr; -}; - -/* Sendfile and pread emulation come from Marc Lehmann's libeio and are - * Copyright (C)2007,2008 Marc Alexander Lehmann. - * Many ideas of oi_async.* are taken from libeio and in fact, I plan to - * use libeio once it becomes usable for me. (The problem is issuing tasks - * from multiple threads.) - */ - -#if !HAVE_PREADWRITE -/* - * make our pread/pwrite emulation safe against themselves, but not against - * normal read/write by using a mutex. slows down execution a lot, - * but that's your problem, not mine. - */ -static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER; -#endif - -#if !HAVE_PREADWRITE -# undef pread -# undef pwrite -# define pread eio__pread -# define pwrite eio__pwrite - -static ssize_t -eio__pread (int fd, void *buf, size_t count, off_t offset) -{ - ssize_t res; - off_t ooffset; - - pthread_mutex_lock(&preadwritelock); - ooffset = lseek (fd, 0, SEEK_CUR); - lseek (fd, offset, SEEK_SET); - res = read (fd, buf, count); - lseek (fd, ooffset, SEEK_SET); - pthread_mutex_unlock(&preadwritelock); - - return res; -} - -static ssize_t -eio__pwrite (int fd, void *buf, size_t count, off_t offset) -{ - ssize_t res; - off_t ooffset; - - pthread_mutex_lock(&preadwritelock); - ooffset = lseek (fd, 0, SEEK_CUR); - lseek (fd, offset, SEEK_SET); - res = write (fd, buf, count); - lseek (fd, offset, SEEK_SET); - pthread_mutex_unlock(&preadwritelock); - - return res; -} -#endif - - -/* sendfile always needs emulation */ -static ssize_t -eio__sendfile (int ofd, int ifd, off_t offset, size_t count) -{ - ssize_t res; - - if (!count) - return 0; - -#if HAVE_SENDFILE -# if __linux - res = sendfile (ofd, ifd, &offset, count); - -# elif __freebsd - /* - * Of course, the freebsd sendfile is a dire hack with no thoughts - * wasted on making it similar to other I/O functions. - */ - { - off_t sbytes; - res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); - - if (res < 0 && sbytes) - /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */ - res = sbytes; - } - -# elif __hpux - res = sendfile (ofd, ifd, offset, count, 0, 0); - -# elif __solaris - { - struct sendfilevec vec; - size_t sbytes; - - vec.sfv_fd = ifd; - vec.sfv_flag = 0; - vec.sfv_off = offset; - vec.sfv_len = count; - - res = sendfilev (ofd, &vec, 1, &sbytes); - - if (res < 0 && sbytes) - res = sbytes; - } - -# endif -#else - res = -1; - errno = ENOSYS; -#endif - - if (res < 0 - && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK -#if __solaris - || errno == EAFNOSUPPORT || errno == EPROTOTYPE -#endif - ) - ) - { - /* emulate sendfile. this is a major pain in the ass */ -/* buffer size for various temporary buffers */ -#define EIO_BUFSIZE 65536 - char *eio_buf = malloc (EIO_BUFSIZE); - errno = ENOMEM; - if (!eio_buf) - return -1; - - res = 0; - - while (count) { - ssize_t cnt; - - cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset); - - if (cnt <= 0) { - if (cnt && !res) res = -1; - break; - } - - cnt = write (ofd, eio_buf, cnt); - - if (cnt <= 0) { - if (cnt && !res) res = -1; - break; - } - - offset += cnt; - res += cnt; - count -= cnt; - } - - free(eio_buf); - } - - return res; -} - -static oi_task* -queue_shift(pthread_mutex_t *lock, oi_queue *queue) -{ - oi_queue *last = NULL; - pthread_mutex_lock(lock); - if(!oi_queue_empty(queue)) { - last = oi_queue_last(queue); - oi_queue_remove(last); - } - pthread_mutex_unlock(lock); - - if(last == NULL) - return NULL; - - return oi_queue_data(last, oi_task, queue); -} - -#define P1(name,a) { \ - t->params.name.result = name( t->params.name.a ); \ - break; \ -} - -#define P2(name,a,b) { \ - t->params.name.result = name( t->params.name.a \ - , t->params.name.b \ - ); \ - break; \ -} - -#define P3(name,a,b,c) { \ - t->params.name.result = name( t->params.name.a \ - , t->params.name.b \ - , t->params.name.c \ - ); \ - break; \ -} - -#define P4(name,a,b,c,d) { \ - t->params.name.result = name( t->params.name.a \ - , t->params.name.b \ - , t->params.name.c \ - , t->params.name.d \ - ); \ - break; \ -} - -static void -execute_task(oi_task *t) -{ - errno = 0; - switch(t->type) { - case OI_TASK_OPEN: P3(open, pathname, flags, mode); - case OI_TASK_READ: P3(read, fd, buf, count); - case OI_TASK_WRITE: P3(write, fd, buf, count); - case OI_TASK_CLOSE: P1(close, fd); - case OI_TASK_SLEEP: P1(sleep, seconds); - case OI_TASK_SENDFILE: P4(eio__sendfile, out_fd, in_fd, offset, count); - case OI_TASK_GETADDRINFO: P4(getaddrinfo, nodename, servname, hints, res); - case OI_TASK_LSTAT: P2(lstat, path, buf); - default: - assert(0 && "unknown task type"); - break; - } - t->errorno = errno; -} - -static void -attempt_to_get_a_task(struct worker *worker) -{ - char dummy; - assert(readiness_pipe[0] > 0); - int r = read(readiness_pipe[0], &dummy, 1); - if(r == -1 && (errno != EAGAIN || errno != EINTR)) { - perror("read(readiness_pipe[0])"); - return; - } - - // 1 pop task from queue - assert(worker->task == NULL); - oi_task *task = queue_shift(&queue_lock, &waiting_tasks); - if(task == NULL) return; - worker->task = task; - - // 2 run task - execute_task(task); - - // 3 notify complition - oi_async *async = task->async; - assert(async != NULL); - pthread_mutex_lock(&finished_lock); - oi_queue_insert_head(&async->finished_tasks, &task->queue); - pthread_mutex_unlock(&finished_lock); - ev_async_send(async->loop, &async->watcher); - worker->task = NULL; - - /* attempt to pull another task */ - return attempt_to_get_a_task(worker); -} - -void * -worker_loop(void *data) -{ - int r; - struct worker *worker = data; - fd_set readfds; - FD_ZERO(&readfds); - FD_SET(readiness_pipe[0], &readfds); - - active_workers++; - assert(active_workers <= NWORKERS); - - while(1) { - r = select(1+readiness_pipe[0], &readfds, 0, 0, 0); - if(r == -1) break; - attempt_to_get_a_task(worker); - } - active_workers--; - - return NULL; -} - -static struct worker* -worker_new() -{ - int r; - struct worker *worker = calloc(sizeof(struct worker), 1); - if(worker == NULL ) { return NULL; } - - worker->task = NULL; - pthread_attr_setdetachstate(&worker->thread_attr, PTHREAD_CREATE_DETACHED); - - r = pthread_create( &worker->thread - , NULL // &worker->thread_attr - , worker_loop - , worker - ); - if(r != 0) { - /* TODO: error checking */ - perror("pthread_create"); - goto error; - } - - return worker; -error: - free(worker); - return NULL; -} - -static void -start_workers() -{ - assert(readiness_pipe[0] == -1); - assert(readiness_pipe[1] == -1); - assert(active_workers == 0); - - int r = pipe(readiness_pipe); - if(r < 0) { - perror("pipe()"); - assert(0 && "TODO HANDLE ME"); - } - - /* set the write end non-blocking */ - int flags = fcntl(readiness_pipe[1], F_GETFL, 0); - r = fcntl(readiness_pipe[1], F_SETFL, flags | O_NONBLOCK); - if(r < 0) { - assert(0 && "error setting pipe to non-blocking?"); - /* TODO error report */ - } - - oi_queue_init(&waiting_tasks); - - int i; - for(i = 0; i < NWORKERS; i++) - worker_new(); -} - -/* -static void -stop_workers() -{ - assert(0 && "TODO implement me"); -} -*/ - -static void -on_completion(struct ev_loop *loop, ev_async *watcher, int revents) -{ - oi_async *async = watcher->data; - oi_task *task; - - while((task = queue_shift(&finished_lock, &async->finished_tasks))) { - assert(task->active); - task->active = 0; - errno = task->errorno; -# define done_cb(kind) { \ - assert(task->params.kind.cb); \ - task->params.kind.cb(task, task->params.kind.result); \ - break; \ - } - switch(task->type) { - case OI_TASK_OPEN: done_cb(open); - case OI_TASK_READ: done_cb(read); - case OI_TASK_WRITE: done_cb(write); - case OI_TASK_CLOSE: done_cb(close); - case OI_TASK_SLEEP: done_cb(sleep); - case OI_TASK_SENDFILE: done_cb(eio__sendfile); - case OI_TASK_GETADDRINFO: done_cb(getaddrinfo); - case OI_TASK_LSTAT: done_cb(lstat); - } - /* the task is possibly freed by callback. do not access it again. */ - } -} - -void -oi_async_init (oi_async *async) -{ - ev_async_init(&async->watcher, on_completion); - - oi_queue_init(&async->finished_tasks); - oi_queue_init(&async->new_tasks); - - async->watcher.data = async; -} - -static void -dispatch_tasks(oi_async *async) -{ - while(!oi_queue_empty(&async->new_tasks)) { - oi_queue *last = oi_queue_last(&async->new_tasks); - oi_queue_remove(last); - oi_task *task = oi_queue_data(last, oi_task, queue); - - // 1. add task to task queue. - pthread_mutex_lock(&queue_lock); - oi_queue_insert_head(&waiting_tasks, &task->queue); - pthread_mutex_unlock(&queue_lock); - - // 2. write byte to pipe - char dummy; - int written = write(readiness_pipe[1], &dummy, 1); - - // 3. TODO make sure byte is written - assert(written == 1); - } -} - -void -oi_async_attach (struct ev_loop *loop, oi_async *async) -{ - if(active_watchers == 0 && active_workers == 0) - start_workers(); - active_watchers++; - - ev_async_start(loop, &async->watcher); - async->loop = loop; - - dispatch_tasks(async); -} - -void -oi_async_detach (oi_async *async) -{ - if(async->loop == NULL) - return; - ev_async_stop(async->loop, &async->watcher); - async->loop = NULL; - active_watchers--; - if(active_watchers == 0) { - //stop_workers(); - } -} - -void -oi_async_submit (oi_async *async, oi_task *task) -{ - assert(!task->active); - assert(task->async == NULL); - task->async = async; - task->active = 1; - - oi_queue_insert_head(&async->new_tasks, &task->queue); - if(ev_is_active(&async->watcher)) { - dispatch_tasks(async); - } -} - diff --git a/deps/liboi/oi_async.h b/deps/liboi/oi_async.h deleted file mode 100644 index f3ba937caf..0000000000 --- a/deps/liboi/oi_async.h +++ /dev/null @@ -1,218 +0,0 @@ -#include -#include -#include -#include - -#ifndef oi_async_h -#define oi_async_h -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct oi_async oi_async; -typedef struct oi_task oi_task; - -struct oi_async { - /* private */ - ev_async watcher; - struct ev_loop *loop; - - oi_queue finished_tasks; - oi_queue new_tasks; - - /* public */ - void *data; -}; - -typedef void (*oi_task_int_cb)(oi_task *, int result); -typedef void (*oi_task_uint_cb)(oi_task *, unsigned int result); -typedef void (*oi_task_ssize_cb)(oi_task *, ssize_t result); - -struct oi_task { - /* private */ - oi_async *async; - oi_queue queue; - int type; - union { - - struct { - const char *pathname; - int flags; - mode_t mode; - oi_task_int_cb cb; - int result; - } open; - - struct { - int fd; - void *buf; - size_t count; - oi_task_ssize_cb cb; - ssize_t result; - } read; - - struct { - int fd; - const void *buf; - size_t count; - oi_task_ssize_cb cb; - ssize_t result; - } write; - - struct { - int fd; - oi_task_int_cb cb; - int result; - } close; - - struct { - unsigned int seconds; - oi_task_uint_cb cb; - unsigned int result; - } sleep; - - struct { - int out_fd; - int in_fd; - off_t offset; - size_t count; - oi_task_ssize_cb cb; - ssize_t result; - } eio__sendfile; - - struct { - const char *nodename; /* restrict ? */ - const char *servname; /* restrict ? */ - struct addrinfo *hints; - struct addrinfo **res; /* restrict ? */ - oi_task_int_cb cb; - int result; - } getaddrinfo; - - struct { - const char *path; - struct stat *buf; - oi_task_int_cb cb; - int result; - } lstat; - - } params; - - /* read-only */ - volatile unsigned active:1; - int errorno; - - /* public */ - void *data; -}; - -void oi_async_init (oi_async *); -void oi_async_attach (struct ev_loop *loop, oi_async *); -void oi_async_detach (oi_async *); -void oi_async_submit (oi_async *, oi_task *); - -/* To submit a task for async processing - * (0) allocate memory for your task - * (1) initialize the task with one of the functions below - * (2) optionally set the task->data pointer - * (3) oi_async_submit() the task - */ - -enum { OI_TASK_OPEN - , OI_TASK_READ - , OI_TASK_WRITE - , OI_TASK_CLOSE - , OI_TASK_SLEEP - , OI_TASK_SENDFILE - , OI_TASK_GETADDRINFO - , OI_TASK_LSTAT - }; - -#define oi_task_init_common(task, _type) do {\ - (task)->active = 0;\ - (task)->async = NULL;\ - (task)->type = _type;\ -} while(0) - -static inline void -oi_task_init_open(oi_task *t, oi_task_int_cb cb, const char *pathname, int flags, mode_t mode) -{ - oi_task_init_common(t, OI_TASK_OPEN); - t->params.open.cb = cb; - t->params.open.pathname = pathname; - t->params.open.flags = flags; - t->params.open.mode = mode; -} - -static inline void -oi_task_init_read(oi_task *t, oi_task_ssize_cb cb, int fd, void *buf, size_t count) -{ - oi_task_init_common(t, OI_TASK_READ); - t->params.read.cb = cb; - t->params.read.fd = fd; - t->params.read.buf = buf; - t->params.read.count = count; -} - -static inline void -oi_task_init_write(oi_task *t, oi_task_ssize_cb cb, int fd, const void *buf, size_t count) -{ - oi_task_init_common(t, OI_TASK_WRITE); - t->params.write.cb = cb; - t->params.write.fd = fd; - t->params.write.buf = buf; - t->params.write.count = count; -} - -static inline void -oi_task_init_close(oi_task *t, oi_task_int_cb cb, int fd) -{ - oi_task_init_common(t, OI_TASK_CLOSE); - t->params.close.cb = cb; - t->params.close.fd = fd; -} - -static inline void -oi_task_init_sleep(oi_task *t, oi_task_uint_cb cb, unsigned int seconds) -{ - oi_task_init_common(t, OI_TASK_SLEEP); - t->params.sleep.cb = cb; - t->params.sleep.seconds = seconds; -} - -static inline void -oi_task_init_sendfile(oi_task *t, oi_task_ssize_cb cb, int out_fd, int in_fd, off_t offset, size_t count) -{ - oi_task_init_common(t, OI_TASK_SENDFILE); - t->params.eio__sendfile.cb = cb; - t->params.eio__sendfile.out_fd = out_fd; - t->params.eio__sendfile.in_fd = in_fd; - t->params.eio__sendfile.offset = offset; - t->params.eio__sendfile.count = count; -} - -static inline void -oi_task_init_getaddrinfo(oi_task *t, oi_task_int_cb cb, const char *node, - const char *service, struct addrinfo *hints, struct addrinfo **res) -{ - oi_task_init_common(t, OI_TASK_GETADDRINFO); - t->params.getaddrinfo.cb = cb; - t->params.getaddrinfo.nodename = node; - t->params.getaddrinfo.servname = service; - t->params.getaddrinfo.hints = hints; - t->params.getaddrinfo.res = res; -} - -static inline void -oi_task_init_lstat(oi_task *t, oi_task_int_cb cb, const char *path, struct stat *buf) -{ - oi_task_init_common(t, OI_TASK_LSTAT); - t->params.lstat.cb = cb; - t->params.lstat.path = path; - t->params.lstat.buf = buf; -} - -#ifdef __cplusplus -} -#endif -#endif /* oi_async_h */ diff --git a/deps/liboi/oi_error.h b/deps/liboi/oi_error.h index 0f8730f5b5..571a5701fd 100644 --- a/deps/liboi/oi_error.h +++ b/deps/liboi/oi_error.h @@ -4,18 +4,21 @@ extern "C" { #endif +enum oi_error_domain + { OI_ERROR_GNUTLS + , OI_ERROR_EV + , OI_ERROR_CLOSE + , OI_ERROR_SHUTDOWN + , OI_ERROR_OPEN + , OI_ERROR_SEND + , OI_ERROR_RECV + , OI_ERROR_WRITE + , OI_ERROR_READ + , OI_ERROR_SENDFILE + }; + struct oi_error { - enum { OI_ERROR_GNUTLS - , OI_ERROR_EV - , OI_ERROR_CLOSE - , OI_ERROR_SHUTDOWN - , OI_ERROR_OPEN - , OI_ERROR_SEND - , OI_ERROR_RECV - , OI_ERROR_WRITE - , OI_ERROR_READ - , OI_ERROR_SENDFILE - } domain; + enum oi_error_domain domain; int code; /* errno */ }; diff --git a/deps/liboi/oi_file.c b/deps/liboi/oi_file.c deleted file mode 100644 index 1073b0852a..0000000000 --- a/deps/liboi/oi_file.c +++ /dev/null @@ -1,391 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include -#include - -#define RELEASE_BUF(buf) if(buf->release) { buf->release(buf); } -#define DRAIN_CB(file) if(file->on_drain) { file->on_drain(file); } -#define RAISE_ERROR(s, _domain, _code) do { \ - if(s->on_error) { \ - struct oi_error __oi_error; \ - __oi_error.domain = _domain; \ - __oi_error.code = _code; \ - s->on_error(s, __oi_error); \ - } \ -} while(0) \ - -/* forwards */ -static void dispatch_write_buf (oi_file *file); -static void maybe_do_read (oi_file *file); - -static void -after_read(oi_task *task, ssize_t recved) -{ - oi_file *file = task->data; - - if(recved == -1) { - RAISE_ERROR(file, OI_ERROR_READ, errno); - return; - } - - if(recved == 0) - oi_file_read_stop(file); - - if(file->on_read) - file->on_read(file, recved); - - maybe_do_read(file); -} - -static void -maybe_do_read(oi_file *file) -{ - if ( file->read_buffer == NULL - || file->write_buf != NULL - || file->write_socket != NULL - || !oi_queue_empty(&file->write_queue) - || file->io_task.active - ) return; - - assert(file->fd > 0); - - oi_task_init_read ( &file->io_task - , after_read - , file->fd - , file->read_buffer - , file->read_buffer_size - ); - file->io_task.data = file; - oi_async_submit(&file->async, &file->io_task); -} - -static void -submit_read (oi_file *file) -{ -} - -int -oi_file_init (oi_file *file) -{ - oi_async_init(&file->async); - file->async.data = file; - - oi_queue_init(&file->write_queue); - - file->fd = -1; - file->loop = NULL; - file->write_buf = NULL; - file->read_buffer = NULL; - - file->on_open = NULL; - file->on_read = NULL; - file->on_drain = NULL; - file->on_error = NULL; - file->on_close = NULL; - return 0; -} - -void -oi_file_read_start (oi_file *file, void *buffer, size_t bufsize) -{ - file->read_buffer = buffer; - file->read_buffer_size = bufsize; - maybe_do_read(file); -} - -void -oi_file_read_stop (oi_file *file) -{ - file->read_buffer = NULL; -} - -void -oi_api_free_buf_with_heap_base(oi_buf *buf) -{ - free(buf->base); - free(buf); -} - -static void -after_open(oi_task *task, int result) -{ - oi_file *file = task->data; - - if(result == -1) { - RAISE_ERROR(file, OI_ERROR_OPEN, errno); - return; - } - - file->fd = result; - - if(file->on_open) { - file->on_open(file); - } - - maybe_do_read(file); -} - -int -oi_file_open_path (oi_file *file, const char *path, int flags, mode_t mode) -{ - if(file->fd >= 0) - return -1; - oi_task_init_open( &file->io_task - , after_open - , path - , flags - , mode - ); - file->io_task.data = file; - oi_async_submit(&file->async, &file->io_task); - return 0; -} - -int -oi_file_open_stdin (oi_file *file) -{ - if(file->fd >= 0) - return -1; - file->fd = STDIN_FILENO; - if(file->on_open) - file->on_open(file); - return 0; -} - -int -oi_file_open_stdout (oi_file *file) -{ - if(file->fd >= 0) - return -1; - file->fd = STDOUT_FILENO; - if(file->on_open) - file->on_open(file); - return 0; -} - -int -oi_file_open_stderr (oi_file *file) -{ - if(file->fd >= 0) - return -1; - file->fd = STDERR_FILENO; - if(file->on_open) - file->on_open(file); - return 0; -} - -void -oi_file_attach (oi_file *file, struct ev_loop *loop) -{ - oi_async_attach (loop, &file->async); - file->loop = loop; -} - -void -oi_file_detach (oi_file *file) -{ - oi_async_detach (&file->async); - file->loop = NULL; -} - -static void -after_write(oi_task *task, ssize_t result) -{ - oi_file *file = task->data; - - if(result == -1) { - RAISE_ERROR(file, OI_ERROR_WRITE, errno); - return; - } - - assert(file->write_buf != NULL); - oi_buf *buf = file->write_buf; - - buf->written += result; - if(buf->written < buf->len) { - oi_task_init_write ( &file->io_task - , after_write - , file->fd - , buf->base + buf->written - , buf->len - buf->written - ); - file->io_task.data = file; - oi_async_submit(&file->async, &file->io_task); - return; - } - - assert(buf->written == buf->len); - - RELEASE_BUF(file->write_buf); - file->write_buf = NULL; - - if(oi_queue_empty(&file->write_queue)) { - DRAIN_CB(file); - maybe_do_read(file); - } else { - dispatch_write_buf(file); - } - - return; -} - -static void -dispatch_write_buf(oi_file *file) -{ - if(file->write_buf != NULL) - return; - if(oi_queue_empty(&file->write_queue)) - return; - - oi_queue *q = oi_queue_last(&file->write_queue); - oi_queue_remove(q); - oi_buf *buf = file->write_buf = oi_queue_data(q, oi_buf, queue); - - assert(!file->io_task.active); - oi_task_init_write ( &file->io_task - , after_write - , file->fd - , buf->base + buf->written - , buf->len - buf->written - ); - file->io_task.data = file; - oi_async_submit(&file->async, &file->io_task); -} - -int -oi_file_write (oi_file *file, oi_buf *buf) -{ - if(file->fd < 0) - return -1; - if(file->read_buffer) - return -2; - /* TODO better business check*/ - - buf->written = 0; - oi_queue_insert_head(&file->write_queue, &buf->queue); - dispatch_write_buf(file); - - return 0; -} - -// Writes a string to the file. -// NOTE: Allocates memory. Avoid for performance applications. -int -oi_file_write_simple (oi_file *file, const char *str, size_t len) -{ - if(file->fd < 0) - return -1; - if(file->read_buffer) - return -2; - /* TODO better business check*/ - - oi_buf *buf = malloc(sizeof(oi_buf)); - buf->base = malloc(len); - memcpy(buf->base, str, len); - buf->len = len; - buf->release = oi_api_free_buf_with_heap_base; - - oi_file_write(file, buf); - return 0; -} - -static void -clear_write_queue(oi_file *file) -{ - while(!oi_queue_empty(&file->write_queue)) { - oi_queue *q = oi_queue_last(&file->write_queue); - oi_queue_remove(q); - oi_buf *buf = oi_queue_data(q, oi_buf, queue); - RELEASE_BUF(buf); - } -} - -static void -after_close(oi_task *task, int result) -{ - oi_file *file = task->data; - - assert(oi_queue_empty(&file->write_queue)); - - if(result == -1) { - RAISE_ERROR(file, OI_ERROR_CLOSE, errno); - return; - // TODO try to close again? - } - - file->fd = -1; - // TODO deinit task_queue, detach thread_pool_result_watcher - - if(file->on_close) { - file->on_close(file); - } - - return; -} - -void -oi_file_close (oi_file *file) -{ - assert(file->fd >= 0 && "file not open!"); - clear_write_queue(file); - oi_task_init_close ( &file->io_task - , after_close - , file->fd - ); - file->io_task.data = file; - oi_async_submit(&file->async, &file->io_task); -} - -static void -after_sendfile(oi_task *task, ssize_t sent) -{ - oi_file *file = task->data; - oi_socket *socket = file->write_socket; - assert(socket != NULL); - file->write_socket = NULL; - - if(sent == -1) { - RAISE_ERROR(file, OI_ERROR_SENDFILE, errno); - return; - } - - if(socket->on_drain) { - socket->on_drain(socket); - } - - maybe_do_read(file); -} - -int -oi_file_send (oi_file *file, oi_socket *destination, off_t offset, size_t count) -{ - if(file->fd < 0) - return -1; - if(file->read_buffer) - return -2; - /* TODO better business check*/ - - assert(file->write_socket == NULL); - // (1) make sure the write queue on the socket is cleared. - // - // (2) - // - file->write_socket = destination; - oi_task_init_sendfile ( &file->io_task - , after_sendfile - , destination->fd - , file->fd - , offset - , count - ); - file->io_task.data = file; - oi_async_submit(&file->async, &file->io_task); - - return 0; -} - diff --git a/deps/liboi/oi_file.h b/deps/liboi/oi_file.h deleted file mode 100644 index 6cd917ea4c..0000000000 --- a/deps/liboi/oi_file.h +++ /dev/null @@ -1,58 +0,0 @@ -#include -#include -#ifdef __cplusplus -extern "C" { -#endif - -#ifndef oi_file_h -#define oi_file_h - -typedef struct oi_file oi_file; - -int oi_file_init (oi_file *); - -void oi_file_attach (oi_file *, struct ev_loop *); -void oi_file_detach (oi_file *); - -/* WARNING oi_file_open_path: path argument must be valid until oi_file - * object is closed and the on_close() callback is made. oi does not strdup - * the path pointer. */ -int oi_file_open_path (oi_file *, const char *path, int flags, mode_t mode); -int oi_file_open_stdin (oi_file *); -int oi_file_open_stdout (oi_file *); -int oi_file_open_stderr (oi_file *); - -void oi_file_read_start (oi_file *, void *buffer, size_t bufsize); -void oi_file_read_stop (oi_file *); -int oi_file_write (oi_file *, oi_buf *to_write); -int oi_file_write_simple (oi_file *, const char *, size_t); -int oi_file_send (oi_file *source, oi_socket *destination, off_t offset, size_t length); -void oi_file_close (oi_file *); - -struct oi_file { - /* private */ - oi_async async; - oi_task io_task; - struct ev_loop *loop; - oi_queue write_queue; - oi_buf *write_buf; /* TODO this pointer is unnecessary - remove and just look at first element of the queue */ - oi_socket *write_socket; - void *read_buffer; - size_t read_buffer_size; - - /* read-only */ - int fd; - - /* public */ - void (*on_open) (oi_file *); - void (*on_read) (oi_file *, size_t count); - void (*on_drain) (oi_file *); - void (*on_error) (oi_file *, struct oi_error); - void (*on_close) (oi_file *); - void *data; -}; - -#ifdef __cplusplus -} -#endif -#endif /* oi_file_h */ diff --git a/deps/liboi/oi_socket.c b/deps/liboi/oi_socket.c index f922d23890..57dd3fa95d 100644 --- a/deps/liboi/oi_socket.c +++ b/deps/liboi/oi_socket.c @@ -11,6 +11,14 @@ #include #include +#if EV_MULTIPLICITY +# define SOCKET_LOOP_ socket->loop, +# define SERVER_LOOP_ server->loop, +#else +# define SOCKET_LOOP_ +# define SERVER_LOOP_ +#endif // EV_MULTIPLICITY + #if HAVE_GNUTLS # include # define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1) @@ -49,8 +57,8 @@ full_close(oi_socket *socket) socket->read_action = NULL; socket->write_action = NULL; - if(socket->loop) { - ev_feed_event(socket->loop, &socket->read_watcher, EV_READ); + if(socket->attached) { + ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ); } return OKAY; } @@ -88,7 +96,7 @@ update_write_buffer_after_send(oi_socket *socket, ssize_t sent) } if(oi_queue_empty(&socket->out_stream)) { - ev_io_stop(socket->loop, &socket->write_watcher); + ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); if(socket->on_drain) socket->on_drain(socket); } @@ -160,7 +168,7 @@ secure_socket_send(oi_socket *socket) ssize_t sent; if(oi_queue_empty(&socket->out_stream)) { - ev_io_stop(socket->loop, &socket->write_watcher); + ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); return AGAIN; } @@ -337,7 +345,7 @@ socket_send(oi_socket *socket) assert(socket->secure == FALSE); if(oi_queue_empty(&socket->out_stream)) { - ev_io_stop(socket->loop, &socket->write_watcher); + ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); return AGAIN; } @@ -477,14 +485,16 @@ assign_file_descriptor(oi_socket *socket, int fd) * Called by server->connection_watcher. */ static void -on_connection(struct ev_loop *loop, ev_io *watcher, int revents) +on_connection(EV_P_ ev_io *watcher, int revents) { oi_server *server = watcher->data; // printf("on connection!\n"); assert(server->listening); +#if EV_MULTIPLICITY assert(server->loop == loop); +#endif assert(&server->connection_watcher == watcher); if(EV_ERROR & revents) { @@ -524,7 +534,7 @@ on_connection(struct ev_loop *loop, ev_io *watcher, int revents) socket->server = server; assign_file_descriptor(socket, fd); - oi_socket_attach(socket, loop); + oi_socket_attach(EV_A_ socket); } int @@ -595,23 +605,30 @@ oi_server_close(oi_server *server) } void -oi_server_attach (oi_server *server, struct ev_loop *loop) +oi_server_attach (EV_P_ oi_server *server) { - ev_io_start (loop, &server->connection_watcher); + ev_io_start (EV_A_ &server->connection_watcher); +#if EV_MULTIPLICITY server->loop = loop; +#endif + server->attached = TRUE; } void oi_server_detach (oi_server *server) { - ev_io_stop (server->loop, &server->connection_watcher); + ev_io_stop (SERVER_LOOP_ &server->connection_watcher); +#if EV_MULTIPLICITY server->loop = NULL; +#endif + server->attached = FALSE; } void oi_server_init(oi_server *server, int backlog) { server->backlog = backlog; + server->attached = FALSE; server->listening = FALSE; server->fd = -1; server->connection_watcher.data = server; @@ -624,7 +641,7 @@ oi_server_init(oi_server *server, int backlog) /* Internal callback. called by socket->timeout_watcher */ static void -on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents) +on_timeout(EV_P_ ev_timer *watcher, int revents) { oi_socket *socket = watcher->data; @@ -652,7 +669,7 @@ release_write_buffer(oi_socket *socket) /* Internal callback. called by socket->read_watcher */ static void -on_io_event(struct ev_loop *loop, ev_io *watcher, int revents) +on_io_event(EV_P_ ev_io *watcher, int revents) { oi_socket *socket = watcher->data; @@ -698,9 +715,9 @@ on_io_event(struct ev_loop *loop, ev_io *watcher, int revents) close: release_write_buffer(socket); - ev_clear_pending (socket->loop, &socket->write_watcher); - ev_clear_pending (socket->loop, &socket->read_watcher); - ev_clear_pending (socket->loop, &socket->timeout_watcher); + ev_clear_pending (EV_A_ &socket->write_watcher); + ev_clear_pending (EV_A_ &socket->read_watcher); + ev_clear_pending (EV_A_ &socket->timeout_watcher); oi_socket_detach(socket); @@ -721,7 +738,10 @@ oi_socket_init(oi_socket *socket, float timeout) { socket->fd = -1; socket->server = NULL; +#if EV_MULTIPLICITY socket->loop = NULL; +#endif + socket->attached = FALSE; socket->connected = FALSE; oi_queue_init(&socket->out_stream); @@ -765,8 +785,8 @@ oi_socket_write_eof (oi_socket *socket) ) { socket->write_action = secure_half_goodbye; - if(socket->loop) - ev_io_start(socket->loop, &socket->write_watcher); + if(socket->attached) + ev_io_start(SOCKET_LOOP_ &socket->write_watcher); return; } /* secure servers cannot handle half-closed connections? */ @@ -799,8 +819,8 @@ oi_socket_close (oi_socket *socket) socket->read_action = NULL; } - if(socket->loop) - ev_io_start(socket->loop, &socket->write_watcher); + if(socket->attached) + ev_io_start(SOCKET_LOOP_ &socket->write_watcher); return; } @@ -815,7 +835,7 @@ oi_socket_close (oi_socket *socket) void oi_socket_reset_timeout(oi_socket *socket) { - ev_timer_again(socket->loop, &socket->timeout_watcher); + ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher); } /** @@ -831,7 +851,8 @@ oi_socket_write(oi_socket *socket, oi_buf *buf) oi_queue_insert_head(&socket->out_stream, &buf->queue); buf->written = 0; - ev_io_start(socket->loop, &socket->write_watcher); + // XXX if (socket->attached) ?? + ev_io_start(SOCKET_LOOP_ &socket->write_watcher); } static void @@ -856,45 +877,51 @@ oi_socket_write_simple(oi_socket *socket, const char *str, size_t len) } void -oi_socket_attach(oi_socket *socket, struct ev_loop *loop) +oi_socket_attach(EV_P_ oi_socket *socket) { +#if EV_MULTIPLICITY socket->loop = loop; +#endif + socket->attached = TRUE; - ev_timer_again(loop, &socket->timeout_watcher); + ev_timer_again(EV_A_ &socket->timeout_watcher); if(socket->read_action) - ev_io_start(loop, &socket->read_watcher); + ev_io_start(EV_A_ &socket->read_watcher); if(socket->write_action) - ev_io_start(loop, &socket->write_watcher); + ev_io_start(EV_A_ &socket->write_watcher); /* make sure the io_event happens soon in the case we're being reattached */ - ev_feed_event(loop, &socket->read_watcher, EV_READ); + ev_feed_event(EV_A_ &socket->read_watcher, EV_READ); } void oi_socket_detach(oi_socket *socket) { - if(socket->loop) { - ev_io_stop(socket->loop, &socket->write_watcher); - ev_io_stop(socket->loop, &socket->read_watcher); - ev_timer_stop(socket->loop, &socket->timeout_watcher); + if(socket->attached) { + ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); + ev_io_stop(SOCKET_LOOP_ &socket->read_watcher); + ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher); +#if EV_MULTIPLICITY socket->loop = NULL; +#endif + socket->attached = FALSE; } } void oi_socket_read_stop (oi_socket *socket) { - ev_io_stop(socket->loop, &socket->read_watcher); - ev_clear_pending (socket->loop, &socket->read_watcher); + ev_io_stop(SOCKET_LOOP_ &socket->read_watcher); + ev_clear_pending (SOCKET_LOOP_ &socket->read_watcher); } void oi_socket_read_start (oi_socket *socket) { if(socket->read_action) { - ev_io_start(socket->loop, &socket->read_watcher); + ev_io_start(SOCKET_LOOP_ &socket->read_watcher); /* XXX feed event? */ } } diff --git a/deps/liboi/oi_socket.h b/deps/liboi/oi_socket.h index 1b51f0f21c..2298c633b1 100644 --- a/deps/liboi/oi_socket.h +++ b/deps/liboi/oi_socket.h @@ -22,14 +22,14 @@ typedef struct oi_socket oi_socket; void oi_server_init (oi_server *, int backlog); int oi_server_listen (oi_server *, struct addrinfo *addrinfo); -void oi_server_attach (oi_server *, struct ev_loop *loop); +void oi_server_attach (EV_P_ oi_server *); void oi_server_detach (oi_server *); void oi_server_close (oi_server *); void oi_socket_init (oi_socket *, float timeout); int oi_socket_pair (oi_socket *a, oi_socket *b); /* TODO */ int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo); -void oi_socket_attach (oi_socket *, struct ev_loop *loop); +void oi_socket_attach (EV_P_ oi_socket *); void oi_socket_detach (oi_socket *); void oi_socket_read_start (oi_socket *); void oi_socket_read_stop (oi_socket *); @@ -46,7 +46,10 @@ struct oi_server { /* read only */ int fd; int backlog; +#if EV_MULTIPLICITY struct ev_loop *loop; +#endif + unsigned attached:1; unsigned listening:1; /* private */ @@ -61,10 +64,13 @@ struct oi_server { struct oi_socket { /* read only */ int fd; +#if EV_MULTIPLICITY struct ev_loop *loop; +#endif oi_server *server; oi_queue out_stream; size_t written; + unsigned attached:1; unsigned connected:1; unsigned secure:1; unsigned wait_for_secure_hangup:1; diff --git a/deps/liboi/test/connection_interruption.c b/deps/liboi/test/connection_interruption.c index 0b9169a9d1..7dd659194c 100644 --- a/deps/liboi/test/connection_interruption.c +++ b/deps/liboi/test/connection_interruption.c @@ -86,7 +86,6 @@ int main(int argc, const char *argv[]) { int r; - struct ev_loop *loop = ev_default_loop(0); oi_server_init(&server, 1000); server.on_connection = on_server_connection; @@ -126,7 +125,7 @@ main(int argc, const char *argv[]) #endif oi_server_listen(&server, servinfo); - oi_server_attach(&server, loop); + oi_server_attach(EV_DEFAULT_ &server); int i; for(i = 0; i < NCONN; i++) { @@ -144,10 +143,10 @@ main(int argc, const char *argv[]) #endif r = oi_socket_connect(client, servinfo); assert(r == 0 && "problem connecting"); - oi_socket_attach(client, loop); + oi_socket_attach(EV_DEFAULT_ client); } - ev_loop(loop, 0); + ev_loop(EV_DEFAULT_ 0); assert(nconnections == NCONN); diff --git a/deps/liboi/test/echo.c b/deps/liboi/test/echo.c index c68b94cf34..69cd430ea4 100644 --- a/deps/liboi/test/echo.c +++ b/deps/liboi/test/echo.c @@ -44,7 +44,6 @@ int main(int argc, const char *argv[]) { int r; - struct ev_loop *loop = ev_default_loop(0); oi_server server; oi_socket client; @@ -90,9 +89,9 @@ main(int argc, const char *argv[]) #endif r = oi_server_listen(&server, servinfo); assert(r == 0); - oi_server_attach(&server, loop); + oi_server_attach(EV_DEFAULT_ &server); - ev_loop(loop, 0); + ev_loop(EV_DEFAULT_ 0); #if TCP freeaddrinfo(servinfo); diff --git a/deps/liboi/test/fancy_copy.c b/deps/liboi/test/fancy_copy.c deleted file mode 100644 index f5d2ba7194..0000000000 --- a/deps/liboi/test/fancy_copy.c +++ /dev/null @@ -1,223 +0,0 @@ -/* This test uses most of oi's facilities. It starts by opening a new file - * and writing N characters to it. Once the first chunk is written, it opens - * that file with another handle - * - * /tmp/oi_test_src == /tmp/oi_test_dst - * | ^ - * | | - * stream | | write - * | | - * V | - * client ----------> server/connection - */ -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#ifndef TRUE -# define TRUE 1 -#endif -#ifndef FALSE -# define FALSE 0 -#endif - -#include -#include - -#define PORT "5555" - -static struct ev_loop *loop; -static oi_file file_src; -static oi_file file_dst; -static oi_socket client; -static oi_server server; -static oi_socket connection; -static int got_connection = 0; - -static char *src_filename; -static char *dst_filename; - -static void -file_error(oi_file *_) -{ - assert(0); -} - -static void -on_file_dst_open (oi_file *_) -{ - assert(connection.fd > 0); - oi_socket_read_start(&connection); - //printf("file_dst is open\n"); -} - -static void -on_file_dst_close (oi_file *_) -{ - //printf("file dst closed\n"); - oi_file_detach(&file_dst); -} - -static void -on_connection_read (oi_socket *_, const void *buf, size_t count) -{ - assert(file_dst.fd > 0); - if(count == 0) { - file_dst.on_drain = oi_file_close; - oi_socket_close(&connection); - } else { - oi_file_write_simple(&file_dst, buf, count); - } -} - -static void -on_connection_connect (oi_socket *_) -{ - oi_file_init(&file_dst); - file_dst.on_open = on_file_dst_open; - file_dst.on_close = on_file_dst_close; - oi_file_open_path(&file_dst, dst_filename, O_WRONLY | O_CREAT, 0644); - oi_file_attach(&file_dst, loop); - - oi_socket_read_stop(&connection); -} - -static void -on_connection_timeout (oi_socket *_) -{ - assert(0); -} - -static void -on_connection_error (oi_socket *_, struct oi_error e) -{ - assert(0); -} - -static void -on_connection_close (oi_socket *_) -{ - oi_server_close(&server); - oi_server_detach(&server); -} - -static oi_socket* -on_server_connection(oi_server *_, struct sockaddr *addr, socklen_t len) -{ - assert(got_connection == FALSE); - oi_socket_init(&connection, 5.0); - connection.on_connect = on_connection_connect; - connection.on_read = on_connection_read; - connection.on_error = on_connection_error; - connection.on_close = on_connection_close; - connection.on_timeout = on_connection_timeout; - connection.on_drain = oi_socket_close; - got_connection = TRUE; - //printf("on server connection\n"); - return &connection; -} - -static void -on_client_read (oi_socket *_, const void *buf, size_t count) -{ - assert(0); -} - -static void -on_client_error (oi_socket *_, struct oi_error e) -{ - assert(0); -} - -static void -on_client_connect (oi_socket *_) -{ - if(file_src.fd > 0) { - oi_file_send(&file_src, &client, 0, 50*1024); - } -} - -static void -on_client_drain (oi_socket *_) -{ - oi_socket_close(&client); - oi_file_close(&file_src); -} - -static void -on_file_src_open (oi_file *_) -{ - if(client.fd > 0) { - oi_file_send(&file_src, &client, 0, 50*1024); - } -} - -static void -on_client_timeout (oi_socket *_) -{ - assert(0); -} - -int -main(int argc, char *argv[]) -{ - int r; - loop = ev_default_loop(0); - - assert(argc == 3); - src_filename = argv[1]; - dst_filename = argv[2]; - - assert(strlen(src_filename) > 0); - assert(strlen(dst_filename) > 0); - - oi_server_init(&server, 10); - server.on_connection = on_server_connection; - - struct addrinfo *servinfo; - struct addrinfo hints; - memset(&hints, 0, sizeof hints); - - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - r = getaddrinfo(NULL, PORT, &hints, &servinfo); - assert(r == 0); - - r = oi_server_listen(&server, servinfo); - assert(r >= 0 && "problem listening"); - oi_server_attach(&server, loop); - - oi_socket_init(&client, 5.0); - client.on_read = on_client_read; - client.on_error = on_client_error; - client.on_connect = on_client_connect; - client.on_timeout = on_client_timeout; - //client.on_close = oi_socket_detach; - client.on_drain = on_client_drain; - r = oi_socket_connect(&client, servinfo); - assert(r == 0 && "problem connecting"); - oi_socket_attach(&client, loop); - - oi_file_init(&file_src); - file_src.on_open = on_file_src_open; - file_src.on_drain = file_error; - file_src.on_close = oi_file_detach; - oi_file_open_path(&file_src, src_filename, O_RDONLY, 0700); - oi_file_attach(&file_src, loop); - - ev_loop(loop, 0); - - printf("\noi_file: %d bytes\n", sizeof(oi_file)); - printf("oi_socket: %d bytes\n", sizeof(oi_socket)); - - return 0; -} - diff --git a/deps/liboi/test/ping_pong.c b/deps/liboi/test/ping_pong.c index 93199eaa42..26b3f689d2 100644 --- a/deps/liboi/test/ping_pong.c +++ b/deps/liboi/test/ping_pong.c @@ -30,7 +30,7 @@ static void on_client_close(oi_socket *socket) { //printf("client connection closed\n"); - ev_unloop(socket->loop, EVUNLOOP_ALL); + ev_unloop(EV_DEFAULT_ EVUNLOOP_ALL); } static oi_socket* @@ -87,7 +87,6 @@ int main(int argc, const char *argv[]) { int r; - struct ev_loop *loop = ev_default_loop(0); oi_server server; oi_socket client; @@ -133,7 +132,7 @@ main(int argc, const char *argv[]) #endif r = oi_server_listen(&server, servinfo); assert(r == 0); - oi_server_attach(&server, loop); + oi_server_attach(EV_DEFAULT_ &server); oi_socket_init(&client, 5.0); client.on_read = on_client_read; @@ -150,9 +149,9 @@ main(int argc, const char *argv[]) r = oi_socket_connect(&client, servinfo); assert(r == 0 && "problem connecting"); - oi_socket_attach(&client, loop); + oi_socket_attach(EV_DEFAULT_ &client); - ev_loop(loop, 0); + ev_loop(EV_DEFAULT_ 0); assert(successful_ping_count == EXCHANGES + 1); assert(nconnections == 1); diff --git a/deps/liboi/test/sleeping_tasks.c b/deps/liboi/test/sleeping_tasks.c deleted file mode 100644 index d9aeadb799..0000000000 --- a/deps/liboi/test/sleeping_tasks.c +++ /dev/null @@ -1,54 +0,0 @@ -#include /* sleep() */ -#include /* malloc(), free() */ -#include -#include -#include - -#define SLEEPS 4 -static int runs = 0; - -static void -done (oi_task *task, unsigned int result) -{ - assert(result == 0); - if(++runs == SLEEPS) { - ev_timer *timer = task->data; - ev_timer_stop(task->async->loop, timer); - oi_async_detach(task->async); - } - free(task); -} - -static void -on_timeout(struct ev_loop *loop, ev_timer *w, int events) -{ - assert(0 && "timeout before all sleeping tasks were complete!"); -} - -int -main() -{ - struct ev_loop *loop = ev_default_loop(0); - oi_async async; - ev_timer timer; - int i; - - oi_async_init(&async); - for(i = 0; i < SLEEPS; i++) { - oi_task *task = malloc(sizeof(oi_task)); - oi_task_init_sleep(task, done, 1); - task->data = &timer; - oi_async_submit(&async, task); - } - oi_async_attach(loop, &async); - - - ev_timer_init (&timer, on_timeout, 1.2, 0.); - ev_timer_start (loop, &timer); - - ev_loop(loop, 0); - - assert(runs == SLEEPS); - - return 0; -} diff --git a/deps/liboi/test/stdout.c b/deps/liboi/test/stdout.c deleted file mode 100644 index 1bb2469406..0000000000 --- a/deps/liboi/test/stdout.c +++ /dev/null @@ -1,64 +0,0 @@ -#include /* sleep() */ -#include /* malloc(), free() */ -#include -#include -#include - -#include -#include -#include - -#include -#include - -static oi_file file; -static oi_file out; - -#define READ_BUFSIZE (5) -static char read_buf[READ_BUFSIZE]; -# define SEP "~~~~~~~~~~~~~~~~~~~~~~\n" - -static void -on_open(oi_file *f) -{ - oi_file_write_simple(&out, "\n", 1); - oi_file_write_simple(&out, SEP, sizeof(SEP)); - oi_file_read_start(f, read_buf, READ_BUFSIZE); -} - -static void -on_close(oi_file *f) -{ - oi_file_write_simple(&out, SEP, sizeof(SEP)); - oi_file_detach(f); - out.on_drain = oi_file_detach; -} - -static void -on_read(oi_file *f, size_t recved) -{ - if(recved == 0) /* EOF */ - oi_file_close(f); - else - oi_file_write_simple(&out, read_buf, recved); -} - -int -main() -{ - struct ev_loop *loop = ev_default_loop(0); - - oi_file_init(&file); - file.on_open = on_open; - file.on_read = on_read; - file.on_close = on_close; - oi_file_open_path(&file, "LICENSE", O_RDONLY, 0); - oi_file_attach(&file, loop); - - oi_file_init(&out); - oi_file_open_stdout(&out); - oi_file_attach(&out, loop); - - ev_loop(loop, 0); - return 0; -} diff --git a/src/http.cc b/src/http.cc index 131f39d71c..101b185c5e 100644 --- a/src/http.cc +++ b/src/http.cc @@ -588,7 +588,7 @@ HttpServer::Start(struct addrinfo *servinfo) { int r = oi_server_listen(&server, servinfo); if(r == 0) - oi_server_attach(&server, node_loop()); + oi_server_attach(EV_DEFAULT_UC_ &server); return r; } diff --git a/src/net.cc b/src/net.cc index d8db803baa..5f83f34f1d 100644 --- a/src/net.cc +++ b/src/net.cc @@ -159,7 +159,7 @@ Server::ListenTCP (const Arguments& args) r = oi_server_listen(&server->server_, address); if (r != 0) return ThrowException(String::New("Error listening on port")); - oi_server_attach(&server->server_, node_loop()); + oi_server_attach(EV_DEFAULT_UC_ &server->server_); freeaddrinfo(address); @@ -349,7 +349,7 @@ Socket::AfterResolve (eio_req *req) // no error. return. if(r == 0 && req->result == 0) { - oi_socket_attach (&socket->socket_, node_loop()); + oi_socket_attach (EV_DEFAULT_UC_ &socket->socket_); return 0; } diff --git a/src/node.cc b/src/node.cc index bfb072b4d0..002dc71d2d 100644 --- a/src/node.cc +++ b/src/node.cc @@ -129,14 +129,14 @@ void node_fatal_exception (TryCatch &try_catch) { ReportException(&try_catch); - ev_unloop(node_loop(), EVUNLOOP_ALL); + ev_unloop(EV_DEFAULT_UC_ EVUNLOOP_ALL); exit_code = 1; } void node_exit (int code) { exit_code = code; - ev_unloop(node_loop(), EVUNLOOP_ALL); + ev_unloop(EV_DEFAULT_UC_ EVUNLOOP_ALL); } @@ -170,6 +170,8 @@ node_eio_warmup (void) int main (int argc, char *argv[]) { + ev_default_loop(EVFLAG_AUTO); // initialize the default ev loop. + // start eio thread pool ev_async_init(&thread_pool_watcher, thread_pool_cb); eio_init(thread_pool_want_poll, NULL); @@ -220,7 +222,7 @@ main (int argc, char *argv[]) ExecuteString(String::New(native_main), String::New("main.js")); if (try_catch.HasCaught()) goto native_js_error; - ev_loop(node_loop(), 0); + ev_loop(EV_DEFAULT_UC_ 0); context.Dispose(); diff --git a/src/node.h b/src/node.h index 04f1bb4feb..5c30b4c90e 100644 --- a/src/node.h +++ b/src/node.h @@ -13,7 +13,6 @@ enum encoding {UTF8, RAW}; void node_fatal_exception (v8::TryCatch &try_catch); -#define node_loop() ev_default_loop(0) void node_exit (int code); // call this after creating a new eio event. diff --git a/src/timers.cc b/src/timers.cc index 967cfb27e1..a427b22f6b 100644 --- a/src/timers.cc +++ b/src/timers.cc @@ -68,12 +68,12 @@ Timer::Timer (Handle callback, ev_tstamp after, ev_tstamp repeat) ev_timer_init(&watcher_, Timer::OnTimeout, after, repeat); watcher_.data = this; - ev_timer_start(node_loop(), &watcher_); + ev_timer_start(EV_DEFAULT_UC_ &watcher_); } Timer::~Timer () { - ev_timer_stop (node_loop(), &watcher_); + ev_timer_stop (EV_DEFAULT_UC_ &watcher_); handle_->SetInternalField(0, Undefined()); handle_.Dispose(); }