You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

240 lines
5.8 KiB

#include "js_stream.h"
#include "async-wrap.h"
#include "env.h"
#include "env-inl.h"
#include "node_buffer.h"
#include "stream_base.h"
#include "stream_base-inl.h"
#include "v8.h"
namespace node {
using v8::Array;
using v8::Context;
using v8::External;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Local;
using v8::Object;
using v8::Value;
JSStream::JSStream(Environment* env, Local<Object> obj, AsyncWrap* parent)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM, parent),
StreamBase(env) {
node::Wrap(obj, this);
MakeWeak<JSStream>(this);
}
JSStream::~JSStream() {
}
void* JSStream::Cast() {
return static_cast<void*>(this);
}
AsyncWrap* JSStream::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
bool JSStream::IsAlive() {
v8::Local<v8::Value> fn = object()->Get(env()->isalive_string());
if (!fn->IsFunction())
return false;
return MakeCallback(fn.As<v8::Function>(), 0, nullptr)->IsTrue();
}
bool JSStream::IsClosing() {
return MakeCallback(env()->isclosing_string(), 0, nullptr)->IsTrue();
}
int JSStream::ReadStart() {
return MakeCallback(env()->onreadstart_string(), 0, nullptr)->Int32Value();
}
int JSStream::ReadStop() {
return MakeCallback(env()->onreadstop_string(), 0, nullptr)->Int32Value();
}
int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
HandleScope scope(env()->isolate());
Local<Value> argv[] = {
req_wrap->object()
};
req_wrap->Dispatched();
Local<Value> res =
MakeCallback(env()->onshutdown_string(), arraysize(argv), argv);
return res->Int32Value();
}
int JSStream::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) {
CHECK_EQ(send_handle, nullptr);
HandleScope scope(env()->isolate());
Local<Array> bufs_arr = Array::New(env()->isolate(), count);
Local<Object> buf;
for (size_t i = 0; i < count; i++) {
buf = Buffer::Copy(env(), bufs[i].base, bufs[i].len).ToLocalChecked();
bufs_arr->Set(i, buf);
}
Local<Value> argv[] = {
w->object(),
bufs_arr
};
w->Dispatched();
Local<Value> res =
MakeCallback(env()->onwrite_string(), arraysize(argv), argv);
return res->Int32Value();
}
void JSStream::New(const FunctionCallbackInfo<Value>& args) {
// This constructor should not be exposed to public javascript.
// Therefore we assert that we are not trying to call this as a
// normal function.
CHECK(args.IsConstructCall());
Environment* env = Environment::GetCurrent(args);
JSStream* wrap;
if (args.Length() == 0) {
wrap = new JSStream(env, args.This(), nullptr);
} else if (args[0]->IsExternal()) {
void* ptr = args[0].As<External>()->Value();
wrap = new JSStream(env, args.This(), static_cast<AsyncWrap*>(ptr));
} else {
UNREACHABLE();
}
CHECK(wrap);
}
static void FreeCallback(char* data, void* hint) {
// Intentional no-op
}
void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
uv_buf_t buf;
wrap->OnAlloc(args[0]->Int32Value(), &buf);
Local<Object> vbuf = Buffer::New(
wrap->env(),
buf.base,
buf.len,
FreeCallback,
nullptr).ToLocalChecked();
return args.GetReturnValue().Set(vbuf);
}
void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
CHECK(Buffer::HasInstance(args[1]));
uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
wrap->OnRead(args[0]->Int32Value(), &buf);
}
void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
CHECK(args[0]->IsObject());
WriteWrap* w;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
wrap->OnAfterWrite(w);
}
template <class Wrap>
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
Wrap* w;
CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
w->Done(args[1]->Int32Value());
}
void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
CHECK(Buffer::HasInstance(args[0]));
char* data = Buffer::Data(args[0]);
int len = Buffer::Length(args[0]);
do {
uv_buf_t buf;
ssize_t avail = len;
wrap->OnAlloc(len, &buf);
if (static_cast<ssize_t>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, data, avail);
data += avail;
len -= avail;
wrap->OnRead(avail, &buf);
} while (len != 0);
}
void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
wrap->OnRead(UV_EOF, nullptr);
}
void JSStream::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context) {
Environment* env = Environment::GetCurrent(context);
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"));
t->InstanceTemplate()->SetInternalFieldCount(1);
env->SetProtoMethod(t, "doAlloc", DoAlloc);
env->SetProtoMethod(t, "doRead", DoRead);
env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
env->SetProtoMethod(t, "readBuffer", ReadBuffer);
env->SetProtoMethod(t, "emitEOF", EmitEOF);
StreamBase::AddMethods<JSStream>(env, t, StreamBase::kFlagHasWritev);
target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"),
t->GetFunction());
env->set_jsstream_constructor_template(t);
}
} // namespace node
NODE_MODULE_CONTEXT_AWARE_BUILTIN(js_stream, node::JSStream::Initialize)