Browse Source

file operations are queued.

v0.7.4-release
Ryan 16 years ago
parent
commit
12d31dd0b7
  1. 202
      src/file.cc
  2. 65
      src/file.js
  3. 9
      src/main.js
  4. 16
      src/node.cc
  5. 4
      src/node.h
  6. 2
      wscript

202
src/file.cc

@ -6,11 +6,12 @@
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
using namespace v8;
#define ON_OPEN_SYMBOL v8::String::NewSymbol("onOpen")
#define FD_SYMBOL v8::String::NewSymbol("fd")
#define ACTION_QUEUE_SYMBOL v8::String::NewSymbol("_actionQueue")
class File;
class Callback {
@ -159,18 +160,19 @@ public:
static File* Unwrap (Handle<Object> obj);
Handle<Value> Open (const char *path, const char *mode, Handle<Value> callback);
static Handle<Value> Open (const Arguments& args);
static int AfterOpen (eio_req *req);
void Close ();
static Handle<Value> Close (const Arguments& args);
static int AfterClose (eio_req *req);
void Write (char *buf, size_t length, Callback *callback);
static Handle<Value> Write (const Arguments& args);
static int AfterWrite (eio_req *req);
private:
static void MakeWeak (Persistent<Value> _, void *data);
void CallTopCallback (const int argc, Handle<Value> argv[]);
Persistent<Object> handle_;
};
@ -178,14 +180,17 @@ File::File (Handle<Object> handle)
{
HandleScope scope;
handle_ = Persistent<Object>::New(handle);
Handle<External> external = External::New(this);
handle_->SetInternalField(0, external);
handle_.MakeWeak(this, File::MakeWeak);
handle_->Set(ACTION_QUEUE_SYMBOL, Array::New());
}
File::~File ()
{
Close();
// XXX call close?
handle_->SetInternalField(0, Undefined());
handle_.Dispose();
handle_.Clear();
@ -216,19 +221,28 @@ File::AfterClose (eio_req *req)
file->handle_->Delete(FD_SYMBOL);
}
// TODO
printf("after close\n");
const int argc = 1;
Local<Value> argv[argc];
argv[0] = Integer::New(req->errorno);
file->CallTopCallback(argc, argv);
return 0;
}
void
File::Close ()
Handle<Value>
File::Close (const Arguments& args)
{
Handle<Value> fd_value = handle_->Get(FD_SYMBOL);
HandleScope scope;
File *file = File::Unwrap(args.Holder());
Handle<Value> fd_value = file->handle_->Get(FD_SYMBOL);
int fd = fd_value->IntegerValue();
eio_req *req = eio_close (fd, EIO_PRI_DEFAULT, File::AfterClose, this);
eio_req *req = eio_close (fd, EIO_PRI_DEFAULT, File::AfterClose, file);
node_eio_submit(req);
return Undefined();
}
int
@ -241,39 +255,36 @@ File::AfterOpen (eio_req *req)
file->handle_->Set(FD_SYMBOL, Integer::New(req->result));
}
Handle<Value> callback_value = file->handle_->Get(ON_OPEN_SYMBOL);
if (!callback_value->IsFunction())
return 0;
Handle<Function> callback = Handle<Function>::Cast(callback_value);
file->handle_->Delete(ON_OPEN_SYMBOL);
const int argc = 1;
Handle<Value> argv[argc];
argv[0] = Integer::New(req->errorno);
TryCatch try_catch;
callback->Call(file->handle_, argc, argv);
if(try_catch.HasCaught())
node_fatal_exception(try_catch);
file->CallTopCallback(argc, argv);
return 0;
}
Handle<Value>
File::Open (const char *path, const char *mode, Handle<Value> callback)
File::Open (const Arguments& args)
{
/* check arguments */
if (args.Length() < 1) return Undefined();
if (!args[0]->IsString()) return Undefined();
HandleScope scope;
File *file = File::Unwrap(args.Holder());
// make sure that we don't already have a pending open
if (handle_->Has(ON_OPEN_SYMBOL)) {
return ThrowException(String::New("File object is already being opened."));
if (file->handle_->Has(FD_SYMBOL)) {
return ThrowException(String::New("File object is opened."));
}
if (callback->IsFunction()) handle_->Set(ON_OPEN_SYMBOL, callback);
// Get the current umask
mode_t mask = umask(0);
umask(mask);
String::Utf8Value path(args[0]->ToString());
int flags = O_RDONLY; // default
if (args[1]->IsString()) {
String::AsciiValue mode_v(args[1]->ToString());
char *mode = *mode_v;
// XXX is this interpretation of the mode correct?
// I don't want to to use fopen() directly because eio doesn't support it.
switch(mode[0]) {
@ -287,92 +298,74 @@ File::Open (const char *path, const char *mode, Handle<Value> callback)
flags = O_APPEND | O_CREAT | (mode[1] == '+' ? O_RDWR : O_WRONLY);
break;
}
}
eio_req *req = eio_open (path, flags, mask, EIO_PRI_DEFAULT, File::AfterOpen, this);
// Get the current umask
mode_t mask = umask(0);
umask(mask);
eio_req *req = eio_open (*path, flags, mask, EIO_PRI_DEFAULT, File::AfterOpen, file);
node_eio_submit(req);
return Undefined();
}
int
File::AfterWrite (eio_req *req)
void
File::CallTopCallback (const int argc, Handle<Value> argv[])
{
Callback *callback = static_cast<Callback*>(req->data);
HandleScope scope;
char *buf = static_cast<char*>(req->ptr2);
delete buf;
size_t written = req->result;
if (callback) {
const int argc = 2;
Local<Value> argv[argc];
argv[0] = Integer::New(req->errorno);
argv[1] = written >= 0 ? Integer::New(written) : Integer::New(0);
callback->Call(callback->file->handle_, argc, argv);
Local<Value> queue_value = handle_->Get(ACTION_QUEUE_SYMBOL);
assert(queue_value->IsArray());
delete callback;
}
return 0;
}
Local<Array> queue = Local<Array>::Cast(queue_value);
Local<Value> top_value = queue->Get(Integer::New(0));
if (top_value->IsObject()) {
Local<Object> top = top_value->ToObject();
Local<Value> callback_value = top->Get(String::NewSymbol("callback"));
if (callback_value->IsFunction()) {
Handle<Function> callback = Handle<Function>::Cast(callback_value);
void
File::Write (char *buf, size_t length, Callback *callback)
{
if (callback)
callback->file = this;
// NOTE: -1 offset in eio_write() invokes write() instead of pwrite()
if (handle_->Has(FD_SYMBOL) == false) {
printf("trying to write to a bad fd!\n");
TryCatch try_catch;
callback->Call(handle_, argc, argv);
if(try_catch.HasCaught()) {
node_fatal_exception(try_catch);
return;
}
Handle<Value> fd_value = handle_->Get(FD_SYMBOL);
int fd = fd_value->IntegerValue();
eio_req *req = eio_write(fd, buf, length, -1, EIO_PRI_DEFAULT, File::AfterWrite, callback);
node_eio_submit(req);
}
}
static Handle<Value>
NewFile (const Arguments& args)
{
HandleScope scope;
File *file = new File(args.Holder());
if(file == NULL)
return Undefined(); // XXX raise error?
// poll_actions
Local<Value> poll_actions_value = handle_->Get(String::NewSymbol("_pollActions"));
assert(poll_actions_value->IsFunction());
Handle<Function> poll_actions = Handle<Function>::Cast(poll_actions_value);
return args.This();
poll_actions->Call(handle_, 0, NULL);
}
JS_METHOD(file_open)
{
if (args.Length() < 1) return Undefined();
if (!args[0]->IsString()) return Undefined();
HandleScope scope;
int
File::AfterWrite (eio_req *req)
{
File *file = static_cast<File*>(req->data);
File *file = File::Unwrap(args.Holder());
String::Utf8Value path(args[0]->ToString());
if (args[1]->IsString()) {
String::AsciiValue mode(args[1]->ToString());
return file->Open(*path, *mode, args[2]);
} else {
return file->Open(*path, "r", args[1]);
}
}
char *buf = static_cast<char*>(req->ptr2);
delete buf;
size_t written = req->result;
JS_METHOD(file_close)
{
HandleScope scope;
File *file = File::Unwrap(args.Holder());
file->Close();
const int argc = 2;
Local<Value> argv[argc];
argv[0] = Integer::New(req->errorno);
argv[1] = written >= 0 ? Integer::New(written) : Integer::New(0);
file->CallTopCallback(argc, argv);
return Undefined();
return 0;
}
JS_METHOD(file_write)
Handle<Value>
File::Write (const Arguments& args)
{
if (args.Length() < 1) return Undefined();
if (!args[0]->IsString())
@ -406,13 +399,30 @@ JS_METHOD(file_write)
return Undefined();
}
Callback *callback = args[1]->IsFunction() ? new Callback(args[1]) : NULL;
if (file->handle_->Has(FD_SYMBOL) == false) {
printf("trying to write to a bad fd!\n");
return Undefined();
}
Handle<Value> fd_value = file->handle_->Get(FD_SYMBOL);
int fd = fd_value->IntegerValue();
file->Write(buf, length, callback);
// NOTE: -1 offset in eio_write() invokes write() instead of pwrite()
eio_req *req = eio_write(fd, buf, length, -1, EIO_PRI_DEFAULT, File::AfterWrite, file);
node_eio_submit(req);
return Undefined();
}
static Handle<Value>
NewFile (const Arguments& args)
{
HandleScope scope;
File *file = new File(args.Holder());
if(file == NULL)
return Undefined(); // XXX raise error?
return args.This();
}
void
NodeInit_file (Handle<Object> target)
@ -429,7 +439,7 @@ NodeInit_file (Handle<Object> target)
file_template->InstanceTemplate()->SetInternalFieldCount(1);
target->Set(String::NewSymbol("File"), file_template->GetFunction());
// class method for File
// class methods for File
file_template->GetFunction()->Set(String::NewSymbol("STDIN_FILENO"),
Integer::New(STDIN_FILENO));
@ -439,7 +449,7 @@ NodeInit_file (Handle<Object> target)
file_template->GetFunction()->Set(String::NewSymbol("STDERR_FILENO"),
Integer::New(STDERR_FILENO));
JS_SET_METHOD(file_template->InstanceTemplate(), "open", file_open);
JS_SET_METHOD(file_template->InstanceTemplate(), "close", file_close);
JS_SET_METHOD(file_template->InstanceTemplate(), "write", file_write);
JS_SET_METHOD(file_template->InstanceTemplate(), "_ffi_open", File::Open);
JS_SET_METHOD(file_template->InstanceTemplate(), "_ffi_close", File::Close);
JS_SET_METHOD(file_template->InstanceTemplate(), "_ffi_write", File::Write);
}

65
src/file.js

@ -0,0 +1,65 @@
// Some explanation of the File binding.
//
// All file operations are blocking. To get around this they are executed
// in a thread pool in C++ (libeio).
//
// The ordering of method calls to a file should be preserved, so they are
// only executed one at a time. A queue, called _actionQueue is employed.
//
// The constructor File() is implemented in C++. It initlizes
// the member _actionQueue = []
//
// Any of the methods called on a file are put into this queue. When they
// reach the head of the queue they will be executed. C++ calles the
// method _pollActions each time it becomes idle. If there is no action
// currently being executed then _pollActions will not be called. Thus when
// actions are added to an empty _actionQueue, they should be immediately
// executed.
//
// When an action has completed, the C++ side is going to look at the first
// element of _actionQueue in order to get a handle on the callback
// function. Only after that completion callback has been made can the
// action be shifted out of the queue.
File.prototype.puts = function (data, callback) {
this.write(data + "\n", callback);
};
File.prototype.open = function (path, mode, callback) {
this._addAction("open", [path, mode], callback);
};
File.prototype.close = function (callback) {
this._addAction("close", [], callback);
};
File.prototype.write = function (buf, callback) {
this._addAction("write", [buf], callback);
};
File.prototype._addAction = function (method, args, callback) {
this._actionQueue.push({ method: method
, callback: callback
, args: args
});
if (this._actionQueue.length == 1) this._act();
}
File.prototype._act = function () {
var action = this._actionQueue[0];
if (action)
this["_ffi_" + action.method].apply(this, action.args);
};
// called from C++ after each action finishes
// (i.e. when it returns from the thread pool)
File.prototype._pollActions = function () {
this._actionQueue.shift();
this._act();
};
var stdout = new File();
stdout.fd = File.STDOUT_FILENO;
var stderr = new File();
stderr.fd = File.STDERR_FILENO;

9
src/main.js

@ -1,12 +1,3 @@
File.prototype.puts = function (data, callback) {
this.write(data + "\n", callback);
};
var stdout = new File();
stdout.fd = File.STDOUT_FILENO;
var stderr = new File();
stderr.fd = File.STDERR_FILENO;
// module search paths
node.includes = ["."];

16
src/node.cc

@ -245,16 +245,20 @@ main (int argc, char *argv[])
// NATIVE JAVASCRIPT MODULES
TryCatch try_catch;
Handle<Value> result = ExecuteString(String::New(native_main),
String::New("main.js"));
if (try_catch.HasCaught()) {
ReportException(&try_catch);
return 1;
}
ExecuteString(String::New(native_file), String::New("file.js"));
if (try_catch.HasCaught()) goto native_js_error;
ExecuteString(String::New(native_main), String::New("main.js"));
if (try_catch.HasCaught()) goto native_js_error;
ev_loop(node_loop(), 0);
context.Dispose();
return exit_code;
native_js_error:
ReportException(&try_catch);
return 1;
}

4
src/node.h

@ -6,9 +6,9 @@
#include <v8.h>
#define JS_SYMBOL(name) v8::String::NewSymbol(name)
#define JS_METHOD(name) v8::Handle<v8::Value> jsmethod_##name (const v8::Arguments& args)
#define JS_METHOD(name) v8::Handle<v8::Value> name (const v8::Arguments& args)
#define JS_SET_METHOD(obj, name, callback) \
obj->Set(JS_SYMBOL(name), v8::FunctionTemplate::New(jsmethod_##callback)->GetFunction())
obj->Set(JS_SYMBOL(name), v8::FunctionTemplate::New(callback)->GetFunction())
void node_fatal_exception (v8::TryCatch &try_catch);

2
wscript

@ -100,7 +100,7 @@ def build(bld):
js2c.JS2C(source, targets)
native_cc = bld.new_task_gen(
source="src/main.js",
source="src/file.js src/main.js",
target="src/natives.h",
rule=javascript_in_c,
before="cxx"

Loading…
Cancel
Save