You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

136 lines
3.3 KiB

'use strict';
const domain = require('domain');
const EE = require('events');
const fs = require('fs');
const net = require('net');
const util = require('util');
const print = process._rawDebug;
const pipeList = [];
const FILENAME = '/tmp/tmp.tmp';
const PIPENAME = '/tmp/node-domain-example-';
const FILESIZE = 1024;
var uid = 0;
// Setting up temporary resources
const buf = Buffer(FILESIZE);
for (var i = 0; i < buf.length; i++)
buf[i] = ((Math.random() * 1e3) % 78) + 48; // Basic ASCII
fs.writeFileSync(FILENAME, buf);
function ConnectionResource(c) {
EE.call(this);
this._connection = c;
this._alive = true;
this._domain = domain.create();
this._id = Math.random().toString(32).substr(2).substr(0, 8) + (++uid);
this._domain.add(c);
this._domain.on('error', () => {
this._alive = false;
});
}
util.inherits(ConnectionResource, EE);
ConnectionResource.prototype.end = function end(chunk) {
this._alive = false;
this._connection.end(chunk);
this.emit('end');
};
ConnectionResource.prototype.isAlive = function isAlive() {
return this._alive;
};
ConnectionResource.prototype.id = function id() {
return this._id;
};
ConnectionResource.prototype.write = function write(chunk) {
this.emit('data', chunk);
return this._connection.write(chunk);
};
// Example begin
net.createServer((c) => {
const cr = new ConnectionResource(c);
const d1 = domain.create();
fs.open(FILENAME, 'r', d1.intercept((fd) => {
streamInParts(fd, cr, 0);
}));
pipeData(cr);
c.on('close', () => cr.end());
}).listen(8080);
function streamInParts(fd, cr, pos) {
const d2 = domain.create();
var alive = true;
d2.on('error', (er) => {
print('d2 error:', er.message)
cr.end();
});
fs.read(fd, new Buffer(10), 0, 10, pos, d2.intercept((bRead, buf) => {
if (!cr.isAlive()) {
return fs.close(fd);
}
if (cr._connection.bytesWritten < FILESIZE) {
// Documentation says callback is optional, but doesn't mention that if
// the write fails an exception will be thrown.
const goodtogo = cr.write(buf);
if (goodtogo) {
setTimeout(() => streamInParts(fd, cr, pos + bRead), 1000);
} else {
cr._connection.once('drain', () => streamInParts(fd, cr, pos + bRead));
}
return;
}
cr.end(buf);
fs.close(fd);
}));
}
function pipeData(cr) {
const pname = PIPENAME + cr.id();
const ps = net.createServer();
const d3 = domain.create();
const connectionList = [];
d3.on('error', (er) => {
print('d3 error:', er.message);
cr.end();
});
d3.add(ps);
ps.on('connection', (conn) => {
connectionList.push(conn);
conn.on('data', () => {}); // don't care about incoming data.
conn.on('close', () => {
connectionList.splice(connectionList.indexOf(conn), 1);
});
});
cr.on('data', (chunk) => {
for (var i = 0; i < connectionList.length; i++) {
connectionList[i].write(chunk);
}
});
cr.on('end', () => {
for (var i = 0; i < connectionList.length; i++) {
connectionList[i].end();
}
ps.close();
});
pipeList.push(pname);
ps.listen(pname);
}
process.on('SIGINT', () => process.exit());
process.on('exit', () => {
try {
for (var i = 0; i < pipeList.length; i++) {
fs.unlinkSync(pipeList[i]);
}
fs.unlinkSync(FILENAME);
} catch (e) { }
});