Browse Source

streams: introduce StreamWrap and JSStream

Introduce a way to wrap plain-js `stream.Duplex` streams into C++
StreamBase's child class. With such method at hand it is now possible to
pass `stream.Duplex` instance as a `socket` parameter to
`tls.connect()`.

PR-URL: https://github.com/iojs/io.js/pull/926
Reviewed-By: Chris Dickinson <christopher.s.dickinson@gmail.com>
v1.8.0-commit
Fedor Indutny 10 years ago
parent
commit
1738c77835
  1. 118
      lib/_stream_wrap.js
  2. 15
      lib/_tls_wrap.js
  3. 3
      node.gyp
  4. 1
      src/async-wrap.h
  5. 7
      src/env.h
  6. 199
      src/js_stream.cc
  7. 30
      src/js_stream.h
  8. 5
      src/node_wrap.h
  9. 26
      src/stream_base.cc
  10. 16
      src/stream_base.h
  11. 18
      src/stream_wrap.cc
  12. 10
      src/stream_wrap.h
  13. 22
      src/tls_wrap.cc
  14. 11
      src/tls_wrap.h
  15. 69
      test/parallel/test-tls-js-stream.js

118
lib/_stream_wrap.js

@ -0,0 +1,118 @@
const util = require('util');
const Socket = require('net').Socket;
const JSStream = process.binding('js_stream').JSStream;
const uv = process.binding('uv');
function StreamWrap(stream) {
var handle = new JSStream();
this.stream = stream;
var self = this;
handle.close = function(cb) {
cb();
};
handle.isAlive = function() {
return self.isAlive();
};
handle.isClosing = function() {
return self.isClosing();
};
handle.onreadstart = function() {
return self.readStart();
};
handle.onreadstop = function() {
return self.readStop();
};
handle.onshutdown = function(req) {
return self.shutdown(req);
};
handle.onwrite = function(req, bufs) {
return self.write(req, bufs);
};
this.stream.pause();
this.stream.on('data', function(chunk) {
self._handle.readBuffer(chunk);
});
this.stream.once('end', function() {
self._handle.emitEOF();
});
this.stream.on('error', function(err) {
self.emit('error', err);
});
Socket.call(this, {
handle: handle
});
}
util.inherits(StreamWrap, Socket);
module.exports = StreamWrap;
// require('_stream_wrap').StreamWrap
StreamWrap.StreamWrap = StreamWrap;
StreamWrap.prototype.isAlive = function isAlive() {
return this.readable && this.writable;
};
StreamWrap.prototype.isClosing = function isClosing() {
return !this.isAlive();
};
StreamWrap.prototype.readStart = function readStart() {
this.stream.resume();
return 0;
};
StreamWrap.prototype.readStop = function readStop() {
this.stream.pause();
return 0;
};
StreamWrap.prototype.shutdown = function shutdown(req) {
var self = this;
this.stream.end(function() {
// Ensure that write was dispatched
setImmediate(function() {
self._handle.finishShutdown(req, 0);
});
});
return 0;
};
StreamWrap.prototype.write = function write(req, bufs) {
var pending = bufs.length;
var self = this;
self.stream.cork();
bufs.forEach(function(buf) {
self.stream.write(buf, done);
});
self.stream.uncork();
function done(err) {
if (!err && --pending !== 0)
return;
// Ensure that this is called once in case of error
pending = 0;
// Ensure that write was dispatched
setImmediate(function() {
var errCode = 0;
if (err) {
if (err.code && uv['UV_' + err.code])
errCode = uv['UV_' + err.code];
else
errCode = uv.UV_EPIPE;
}
self._handle.doAfterWrite(req);
self._handle.finishWrite(req, errCode);
});
}
return 0;
};

15
lib/_tls_wrap.js

@ -7,6 +7,8 @@ const tls = require('tls');
const util = require('util');
const listenerCount = require('events').listenerCount;
const common = require('_tls_common');
const StreamWrap = require('_stream_wrap').StreamWrap;
const Duplex = require('stream').Duplex;
const debug = util.debuglog('tls');
const Timer = process.binding('timer_wrap').Timer;
const tls_wrap = process.binding('tls_wrap');
@ -224,6 +226,10 @@ function TLSSocket(socket, options) {
this.authorized = false;
this.authorizationError = null;
// Wrap plain JS Stream into StreamWrap
if (!(socket instanceof net.Socket) && socket instanceof Duplex)
socket = new StreamWrap(socket);
// Just a documented property to make secure sockets
// distinguishable from regular ones.
this.encrypted = true;
@ -280,7 +286,8 @@ TLSSocket.prototype._wrapHandle = function(handle) {
// Proxy HandleWrap, PipeWrap and TCPWrap methods
proxiedMethods.forEach(function(name) {
res[name] = function methodProxy() {
return handle[name].apply(handle, arguments);
if (handle[name])
return handle[name].apply(handle, arguments);
};
});
@ -373,7 +380,7 @@ TLSSocket.prototype._init = function(socket) {
this.setTimeout(options.handshakeTimeout, this._handleTimeout);
// Socket already has some buffered data - emulate receiving it
if (socket && socket._readableState.length) {
if (socket && socket._readableState && socket._readableState.length) {
var buf;
while ((buf = socket.read()) !== null)
ssl.receive(buf);
@ -388,6 +395,10 @@ TLSSocket.prototype._init = function(socket) {
self._connecting = false;
self.emit('connect');
});
socket.on('error', function(err) {
self._tlsError(err);
});
}
// Assume `tls.connect()`

3
node.gyp

@ -56,6 +56,7 @@
'lib/_stream_duplex.js',
'lib/_stream_transform.js',
'lib/_stream_passthrough.js',
'lib/_stream_wrap.js',
'lib/string_decoder.js',
'lib/sys.js',
'lib/timers.js',
@ -95,6 +96,7 @@
'src/fs_event_wrap.cc',
'src/cares_wrap.cc',
'src/handle_wrap.cc',
'src/js_stream.cc',
'src/node.cc',
'src/node_buffer.cc',
'src/node_constants.cc',
@ -132,6 +134,7 @@
'src/env.h',
'src/env-inl.h',
'src/handle_wrap.h',
'src/js_stream.h',
'src/node.h',
'src/node_buffer.h',
'src/node_constants.h',

1
src/async-wrap.h

@ -17,6 +17,7 @@ namespace node {
V(FSREQWRAP) \
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
V(JSSTREAM) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(QUERYWRAP) \

7
src/env.h

@ -107,6 +107,8 @@ namespace node {
V(ipv4_string, "IPv4") \
V(ipv6_lc_string, "ipv6") \
V(ipv6_string, "IPv6") \
V(isalive_string, "isAlive") \
V(isclosing_string, "isClosing") \
V(issuer_string, "issuer") \
V(issuercert_string, "issuerCertificate") \
V(kill_signal_string, "killSignal") \
@ -141,9 +143,13 @@ namespace node {
V(onnewsessiondone_string, "onnewsessiondone") \
V(onocspresponse_string, "onocspresponse") \
V(onread_string, "onread") \
V(onreadstart_string, "onreadstart") \
V(onreadstop_string, "onreadstop") \
V(onselect_string, "onselect") \
V(onshutdown_string, "onshutdown") \
V(onsignal_string, "onsignal") \
V(onstop_string, "onstop") \
V(onwrite_string, "onwrite") \
V(output_string, "output") \
V(order_string, "order") \
V(owner_string, "owner") \
@ -225,6 +231,7 @@ namespace node {
V(context, v8::Context) \
V(domain_array, v8::Array) \
V(fs_stats_constructor_function, v8::Function) \
V(jsstream_constructor_template, v8::FunctionTemplate) \
V(module_load_list_array, v8::Array) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(process_object, v8::Object) \

199
src/js_stream.cc

@ -3,19 +3,218 @@
#include "async-wrap.h"
#include "env.h"
#include "env-inl.h"
#include "node_buffer.h"
#include "stream_base.h"
#include "v8.h"
namespace node {
using v8::Array;
using v8::Context;
using v8::External;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Local;
using v8::Object;
using v8::Value;
JSStream::JSStream(Environment* env, Handle<Object> obj, AsyncWrap* parent)
: StreamBase(env),
AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM, parent) {
node::Wrap(obj, this);
}
JSStream::~JSStream() {
}
void* JSStream::Cast() {
return static_cast<void*>(this);
}
AsyncWrap* JSStream::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
bool JSStream::IsAlive() {
return MakeCallback(env()->isalive_string(), 0, nullptr)->IsTrue();
}
bool JSStream::IsClosing() {
return MakeCallback(env()->isclosing_string(), 0, nullptr)->IsTrue();
}
int JSStream::ReadStart() {
return MakeCallback(env()->onreadstart_string(), 0, nullptr)->Int32Value();
}
int JSStream::ReadStop() {
return MakeCallback(env()->onreadstop_string(), 0, nullptr)->Int32Value();
}
int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
HandleScope scope(env()->isolate());
Local<Value> argv[] = {
req_wrap->object()
};
Local<Value> res =
MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv);
return res->Int32Value();
}
int JSStream::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) {
CHECK_EQ(send_handle, nullptr);
HandleScope scope(env()->isolate());
Local<Array> bufs_arr = Array::New(env()->isolate(), count);
for (size_t i = 0; i < count; i++)
bufs_arr->Set(i, Buffer::New(env(), bufs[0].base, bufs[0].len));
Local<Value> argv[] = {
w->object(),
bufs_arr
};
Local<Value> res =
MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv);
return res->Int32Value();
}
void JSStream::New(const FunctionCallbackInfo<Value>& args) {
// This constructor should not be exposed to public javascript.
// Therefore we assert that we are not trying to call this as a
// normal function.
CHECK(args.IsConstructCall());
Environment* env = Environment::GetCurrent(args);
JSStream* wrap;
if (args.Length() == 0) {
wrap = new JSStream(env, args.This(), nullptr);
} else if (args[0]->IsExternal()) {
void* ptr = args[0].As<External>()->Value();
wrap = new JSStream(env, args.This(), static_cast<AsyncWrap*>(ptr));
} else {
UNREACHABLE();
}
CHECK(wrap);
}
static void FreeCallback(char* data, void* hint) {
// Intentional no-op
}
void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap = Unwrap<JSStream>(args.Holder());
uv_buf_t buf;
wrap->OnAlloc(args[0]->Int32Value(), &buf);
args.GetReturnValue().Set(Buffer::New(wrap->env(),
buf.base,
buf.len,
FreeCallback,
nullptr));
}
void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap = Unwrap<JSStream>(args.Holder());
CHECK(Buffer::HasInstance(args[1]));
uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
wrap->OnRead(args[0]->Int32Value(), &buf);
}
void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap = Unwrap<JSStream>(args.Holder());
WriteWrap* w = Unwrap<WriteWrap>(args[0].As<Object>());
wrap->OnAfterWrite(w);
}
template <class Wrap>
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
Wrap* w = Unwrap<Wrap>(args[0].As<Object>());
w->Done(args[0]->Int32Value());
}
void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap = Unwrap<JSStream>(args.Holder());
CHECK(Buffer::HasInstance(args[0]));
char* data = Buffer::Data(args[0]);
int len = Buffer::Length(args[0]);
do {
uv_buf_t buf;
ssize_t avail = len;
wrap->OnAlloc(len, &buf);
if (static_cast<ssize_t>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, data, avail);
data += avail;
len -= avail;
wrap->OnRead(avail, &buf);
} while (len != 0);
}
void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap = Unwrap<JSStream>(args.Holder());
wrap->OnRead(UV_EOF, nullptr);
}
void JSStream::Initialize(Handle<Object> target,
Handle<Value> unused,
Handle<Context> context) {
Environment* env = Environment::GetCurrent(context);
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"));
t->InstanceTemplate()->SetInternalFieldCount(1);
env->SetProtoMethod(t, "doAlloc", DoAlloc);
env->SetProtoMethod(t, "doRead", DoRead);
env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
env->SetProtoMethod(t, "readBuffer", ReadBuffer);
env->SetProtoMethod(t, "emitEOF", EmitEOF);
StreamBase::AddMethods<JSStream>(env, t);
target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"),
t->GetFunction());
env->set_jsstream_constructor_template(t);
}
} // namespace node
NODE_MODULE_CONTEXT_AWARE_BUILTIN(js_stream, node::JSStream::Initialize)

30
src/js_stream.h

@ -8,11 +8,39 @@
namespace node {
class JSStream : public StreamBase {
class JSStream : public StreamBase, public AsyncWrap {
public:
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
void* Cast() override;
bool IsAlive() override;
bool IsClosing() override;
int ReadStart() override;
int ReadStop() override;
int DoShutdown(ShutdownWrap* req_wrap) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
protected:
JSStream(Environment* env, v8::Handle<v8::Object> obj, AsyncWrap* parent);
~JSStream();
AsyncWrap* GetAsyncWrap() override;
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoAlloc(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoRead(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DoAfterWrite(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ReadBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
static void EmitEOF(const v8::FunctionCallbackInfo<v8::Value>& args);
template <class Wrap>
static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
};
} // namespace node

5
src/node_wrap.h

@ -3,6 +3,7 @@
#include "env.h"
#include "env-inl.h"
#include "js_stream.h"
#include "pipe_wrap.h"
#include "tcp_wrap.h"
#include "tty_wrap.h"
@ -40,6 +41,10 @@ namespace node {
env->tls_wrap_constructor_template()->HasInstance(obj)) { \
TLSWrap* const wrap = Unwrap<TLSWrap>(obj); \
BODY \
} else if (env->jsstream_constructor_template().IsEmpty() == false && \
env->jsstream_constructor_template()->HasInstance(obj)) { \
JSStream* const wrap = Unwrap<JSStream>(obj); \
BODY \
} \
}); \
} while (0)

26
src/stream_base.cc

@ -5,6 +5,7 @@
#include "node_buffer.h"
#include "env.h"
#include "env-inl.h"
#include "js_stream.h"
#include "string_bytes.h"
#include "tls_wrap.h"
#include "util.h"
@ -34,6 +35,8 @@ template void StreamBase::AddMethods<StreamWrap>(Environment* env,
Handle<FunctionTemplate> t);
template void StreamBase::AddMethods<TLSWrap>(Environment* env,
Handle<FunctionTemplate> t);
template void StreamBase::AddMethods<JSStream>(Environment* env,
Handle<FunctionTemplate> t);
template <class Base>
@ -488,8 +491,29 @@ void StreamBase::EmitData(ssize_t nread,
}
AsyncWrap* StreamBase::GetAsyncWrap() {
bool StreamBase::IsIPCPipe() {
return false;
}
int StreamBase::GetFD() {
return -1;
}
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
// No TryWrite by default
return 0;
}
const char* StreamResource::Error() const {
return nullptr;
}
void StreamResource::ClearError() {
// No-op
}
} // namespace node

16
src/stream_base.h

@ -106,13 +106,13 @@ class StreamResource {
virtual ~StreamResource() = default;
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0;
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
virtual const char* Error() const = 0;
virtual void ClearError() = 0;
virtual const char* Error() const;
virtual void ClearError();
// Events
inline void OnAfterWrite(WriteWrap* w) {
@ -127,7 +127,7 @@ class StreamResource {
inline void OnRead(size_t nread,
const uv_buf_t* buf,
uv_handle_type pending) {
uv_handle_type pending = UV_UNKNOWN_HANDLE) {
if (read_cb_ != nullptr)
read_cb_(nread, buf, pending, read_ctx_);
}
@ -163,10 +163,10 @@ class StreamBase : public StreamResource {
v8::Handle<v8::FunctionTemplate> target);
virtual void* Cast() = 0;
virtual bool IsAlive() const = 0;
virtual bool IsClosing() const = 0;
virtual bool IsIPCPipe() const = 0;
virtual int GetFD() const = 0;
virtual bool IsAlive() = 0;
virtual bool IsClosing() = 0;
virtual bool IsIPCPipe();
virtual int GetFD();
virtual int ReadStart() = 0;
virtual int ReadStop() = 0;

18
src/stream_wrap.cc

@ -84,7 +84,7 @@ void StreamWrap::AddMethods(Environment* env,
}
int StreamWrap::GetFD() const {
int StreamWrap::GetFD() {
int fd = -1;
#if !defined(_WIN32)
if (stream() != nullptr)
@ -94,12 +94,12 @@ int StreamWrap::GetFD() const {
}
bool StreamWrap::IsAlive() const {
bool StreamWrap::IsAlive() {
return HandleWrap::IsAlive(this);
}
bool StreamWrap::IsClosing() const {
bool StreamWrap::IsClosing() {
return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
}
@ -114,7 +114,7 @@ AsyncWrap* StreamWrap::GetAsyncWrap() {
}
bool StreamWrap::IsIPCPipe() const {
bool StreamWrap::IsIPCPipe() {
return is_named_pipe_ipc();
}
@ -359,16 +359,6 @@ void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
wrap->UpdateWriteQueueSize();
}
const char* StreamWrap::Error() const {
return nullptr;
}
void StreamWrap::ClearError() {
// No-op
}
} // namespace node
NODE_MODULE_CONTEXT_AWARE_BUILTIN(stream_wrap, node::StreamWrap::Initialize)

10
src/stream_wrap.h

@ -19,11 +19,11 @@ class StreamWrap : public HandleWrap, public StreamBase {
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
int GetFD() const override;
int GetFD() override;
void* Cast() override;
bool IsAlive() const override;
bool IsClosing() const override;
bool IsIPCPipe() const override;
bool IsAlive() override;
bool IsClosing() override;
bool IsIPCPipe() override;
// JavaScript functions
int ReadStart() override;
@ -36,8 +36,6 @@ class StreamWrap : public HandleWrap, public StreamBase {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
const char* Error() const override;
void ClearError() override;
inline uv_stream_t* stream() const {
return stream_;

22
src/tls_wrap.cc

@ -216,7 +216,7 @@ void TLSWrap::Receive(const FunctionCallbackInfo<Value>& args) {
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE);
wrap->stream_->OnRead(buf.len, &buf);
data += copy;
len -= copy;
@ -414,7 +414,7 @@ void TLSWrap::ClearOut() {
if (static_cast<int>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, out, avail);
OnRead(avail, &buf, UV_UNKNOWN_HANDLE);
OnRead(avail, &buf);
read -= avail;
}
@ -423,7 +423,7 @@ void TLSWrap::ClearOut() {
int flags = SSL_get_shutdown(ssl_);
if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
eof_ = true;
OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE);
OnRead(UV_EOF, nullptr);
}
if (read == -1) {
@ -495,22 +495,22 @@ AsyncWrap* TLSWrap::GetAsyncWrap() {
}
bool TLSWrap::IsIPCPipe() const {
bool TLSWrap::IsIPCPipe() {
return stream_->IsIPCPipe();
}
int TLSWrap::GetFD() const {
int TLSWrap::GetFD() {
return stream_->GetFD();
}
bool TLSWrap::IsAlive() const {
bool TLSWrap::IsAlive() {
return stream_->IsAlive();
}
bool TLSWrap::IsClosing() const {
bool TLSWrap::IsClosing() {
return stream_->IsClosing();
}
@ -536,12 +536,6 @@ void TLSWrap::ClearError() {
}
int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
// TODO(indutny): Support it
return 0;
}
int TLSWrap::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
@ -668,7 +662,7 @@ void TLSWrap::DoRead(ssize_t nread,
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
OnRead(nread, nullptr, UV_UNKNOWN_HANDLE);
OnRead(nread, nullptr);
return;
}

11
src/tls_wrap.h

@ -32,16 +32,15 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
v8::Handle<v8::Context> context);
void* Cast() override;
int GetFD() const override;
bool IsAlive() const override;
bool IsClosing() const override;
int GetFD() override;
bool IsAlive() override;
bool IsClosing() override;
// JavaScript functions
int ReadStart() override;
int ReadStop() override;
int DoShutdown(ShutdownWrap* req_wrap) override;
int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
@ -78,7 +77,7 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
TLSWrap(Environment* env,
Kind kind,
StreamBase* steram,
StreamBase* stream,
v8::Handle<v8::Object> stream_obj,
v8::Handle<v8::Object> sc);
@ -104,7 +103,7 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
}
AsyncWrap* GetAsyncWrap() override;
bool IsIPCPipe() const override;
bool IsIPCPipe() override;
// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);

69
test/parallel/test-tls-js-stream.js

@ -0,0 +1,69 @@
var assert = require('assert');
var stream = require('stream');
var tls = require('tls');
var fs = require('fs');
var net = require('net');
var common = require('../common');
var connected = {
client: 0,
server: 0
};
var server = tls.createServer({
key: fs.readFileSync(common.fixturesDir + '/keys/agent1-key.pem'),
cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem')
}, function(c) {
console.log('new client');
connected.server++;
c.end('ohai');
}).listen(common.PORT, function() {
var raw = net.connect(common.PORT);
var pending = false;
raw.on('readable', function() {
if (pending)
p._read();
});
var p = new stream.Duplex({
read: function read() {
pending = false;
var chunk = raw.read();
if (chunk) {
console.log('read', chunk);
this.push(chunk);
} else {
pending = true;
}
},
write: function write(data, enc, cb) {
console.log('write', data, enc);
raw.write(data, enc, cb);
}
});
var socket = tls.connect({
socket: p,
rejectUnauthorized: false
}, function() {
console.log('client secure');
connected.client++;
socket.end('hello');
socket.resume();
});
socket.once('close', function() {
console.log('client close');
server.close();
});
});
process.once('exit', function() {
assert.equal(connected.client, 1);
assert.equal(connected.server, 1);
});
Loading…
Cancel
Save