Browse Source

Add Stream base class with stream.pipe

v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
bc695475b9
  1. 10
      lib/http.js
  2. 5
      lib/net.js
  3. 57
      lib/stream.js
  4. 1
      src/node.cc
  5. 20
      test/disabled/pipe-test.js

10
lib/http.js

@ -9,7 +9,7 @@ if (debugLevel & 0x4) {
} }
var net = require('net'); var net = require('net');
var events = require('events'); var Stream = require('stream').Stream;
var FreeList = require('freelist').FreeList; var FreeList = require('freelist').FreeList;
var HTTPParser = process.binding('http_parser').HTTPParser; var HTTPParser = process.binding('http_parser').HTTPParser;
@ -185,7 +185,7 @@ var continueExpression = /100-continue/i;
/* Abstract base class for ServerRequest and ClientResponse. */ /* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage (socket) { function IncomingMessage (socket) {
events.EventEmitter.call(this); Stream.call(this);
// TODO Remove one of these eventually. // TODO Remove one of these eventually.
this.socket = socket; this.socket = socket;
@ -205,7 +205,7 @@ function IncomingMessage (socket) {
this.statusCode = null; this.statusCode = null;
this.client = this.socket; this.client = this.socket;
} }
sys.inherits(IncomingMessage, events.EventEmitter); sys.inherits(IncomingMessage, Stream);
exports.IncomingMessage = IncomingMessage; exports.IncomingMessage = IncomingMessage;
IncomingMessage.prototype._parseQueryString = function () { IncomingMessage.prototype._parseQueryString = function () {
@ -282,7 +282,7 @@ IncomingMessage.prototype._addHeaderLine = function (field, value) {
}; };
function OutgoingMessage (socket) { function OutgoingMessage (socket) {
events.EventEmitter.call(this, socket); Stream.call(this);
// TODO Remove one of these eventually. // TODO Remove one of these eventually.
this.socket = socket; this.socket = socket;
@ -301,7 +301,7 @@ function OutgoingMessage (socket) {
this.finished = false; this.finished = false;
} }
sys.inherits(OutgoingMessage, events.EventEmitter); sys.inherits(OutgoingMessage, Stream);
exports.OutgoingMessage = OutgoingMessage; exports.OutgoingMessage = OutgoingMessage;
// This abstract either writing directly to the socket or buffering it. // This abstract either writing directly to the socket or buffering it.

5
lib/net.js

@ -1,6 +1,7 @@
var sys = require("sys"); var sys = require("sys");
var fs = require("fs"); var fs = require("fs");
var events = require("events"); var events = require("events");
var stream = require("stream");
var dns = require('dns'); var dns = require('dns');
var kMinPoolSpace = 128; var kMinPoolSpace = 128;
@ -519,7 +520,7 @@ function initStream (self) {
function Stream (fd, type) { function Stream (fd, type) {
if (!(this instanceof Stream)) return new Stream(fd, type); if (!(this instanceof Stream)) return new Stream(fd, type);
events.EventEmitter.call(this); stream.Stream.call(this);
this.fd = null; this.fd = null;
this.type = null; this.type = null;
@ -531,7 +532,7 @@ function Stream (fd, type) {
setImplmentationMethods(this); setImplmentationMethods(this);
} }
}; };
sys.inherits(Stream, events.EventEmitter); sys.inherits(Stream, stream.Stream);
exports.Stream = Stream; exports.Stream = Stream;

57
lib/stream.js

@ -0,0 +1,57 @@
var events = require('events');
var inherits = require('sys').inherits;
function Stream () {
events.EventEmitter.call(this);
}
inherits(Stream, events.EventEmitter);
exports.Stream = Stream;
Stream.prototype.pipe = function (dest, options) {
var source = this;
source.on("data", function (chunk) {
if (false === dest.write(chunk)) source.pause();
});
dest.on("drain", function () {
if (source.readable) source.resume();
});
/*
* If the 'end' option is not supplied, dest.end() will be called when
* source gets the 'end' event.
*/
options.end = options && options.end === false ? false : true;
if (options.end) {
source.on("end", function () {
dest.end();
});
}
/*
* Questionable:
*/
if (!source.pause) {
source.pause = function () {
source.emit("pause");
};
}
if (!source.resume) {
source.resume = function () {
source.emit("resume");
};
}
dest.on("pause", function () {
source.pause();
});
dest.on("resume", function () {
if (source.readable) source.resume();
});
};

1
src/node.cc

@ -1519,6 +1519,7 @@ static Handle<Value> Binding(const Arguments& args) {
exports->Set(String::New("utils"), String::New(native_utils)); exports->Set(String::New("utils"), String::New(native_utils));
exports->Set(String::New("path"), String::New(native_path)); exports->Set(String::New("path"), String::New(native_path));
exports->Set(String::New("string_decoder"), String::New(native_string_decoder)); exports->Set(String::New("string_decoder"), String::New(native_string_decoder));
exports->Set(String::New("stream"), String::New(native_stream));
binding_cache->Set(module, exports); binding_cache->Set(module, exports);
} else { } else {

20
test/disabled/pipe-test.js

@ -0,0 +1,20 @@
/*
* try with
* curl -d @/usr/share/dict/words http://localhost:8000/123
*/
http = require('http');
s = http.Server(function (req, res) {
console.log(req.headers);
req.pipe(process.stdout, { end: false });
req.on('end', function () {
res.writeHead(200);
res.write("thanks");
res.end();
});
});
s.listen(8000);
Loading…
Cancel
Save