You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

318 lines
8.7 KiB

12 years ago
var mongodb = require('mongodb');
12 years ago
var thunky = require('thunky');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var Readable = require('stream').Readable || require('readable-stream');
12 years ago
var DRIVER_COLLECTION_PROTO = mongodb.Collection.prototype;
var DRIVER_CURSOR_PROTO = mongodb.Cursor.prototype;
var DRIVER_DB_PROTO = mongodb.Db.prototype;
14 years ago
var noop = function() {};
12 years ago
var forEachMethod = function(oldProto, newProto, fn) {
Object.keys(oldProto).forEach(function(methodName) {
if (oldProto.__lookupGetter__(methodName) || newProto[methodName]) return;
if (methodName[0] === '_' || typeof oldProto[methodName] !== 'function') return;
fn(methodName, oldProto[methodName]);
});
14 years ago
};
12 years ago
var ensureCallback = function(args) {
if (getCallback(args) !== noop) return args;
args = Array.prototype.slice.call(args);
args.push(noop);
return args;
};
12 years ago
var getCallback = function(args) {
var callback = args[args.length-1];
return typeof callback === 'function' ? callback : noop;
};
14 years ago
12 years ago
// Proxy for the native cursor prototype that normalizes method names and
// arguments to fit the mongo shell.
14 years ago
var Cursor = function(oncursor) {
Readable.call(this, {objectMode:true});
12 years ago
this._get = oncursor;
14 years ago
};
util.inherits(Cursor, Readable);
Cursor.prototype.toArray = function() {
12 years ago
this._apply(DRIVER_CURSOR_PROTO.toArray, arguments);
14 years ago
};
12 years ago
14 years ago
Cursor.prototype.next = function() {
12 years ago
this._apply(DRIVER_CURSOR_PROTO.nextObject, arguments);
14 years ago
};
12 years ago
14 years ago
Cursor.prototype.forEach = function() {
12 years ago
this._apply(DRIVER_CURSOR_PROTO.each, arguments);
14 years ago
};
12 years ago
14 years ago
Cursor.prototype.count = function() {
12 years ago
this._apply(DRIVER_CURSOR_PROTO.count, arguments);
14 years ago
};
12 years ago
Cursor.prototype.explain = function() {
this._apply(DRIVER_CURSOR_PROTO.explain, arguments);
};
12 years ago
Cursor.prototype.limit = function() {
return this._config(DRIVER_CURSOR_PROTO.limit, arguments);
14 years ago
};
12 years ago
14 years ago
Cursor.prototype.skip = function() {
12 years ago
return this._config(DRIVER_CURSOR_PROTO.skip, arguments);
14 years ago
};
12 years ago
Cursor.prototype.batchSize = function() {
12 years ago
return this._config(DRIVER_CURSOR_PROTO.batchSize, arguments);
};
14 years ago
12 years ago
Cursor.prototype.sort = function() {
return this._config(DRIVER_CURSOR_PROTO.sort, arguments);
};
Cursor.prototype.destroy = function() {
this._apply(DRIVER_CURSOR_PROTO.close, arguments);
this.push(null);
};
12 years ago
Cursor.prototype._apply = function(fn, args) {
this._get(function(err, cursor) {
if (err) return getCallback(args)(err);
fn.apply(cursor, args);
});
14 years ago
12 years ago
return this;
14 years ago
};
Cursor.prototype._read = function() { // 0.10 stream support (0.8 compat using readable-stream)
var self = this;
this.next(function(err, data) {
if (err) return self.emit('error', err);
self.push(data);
});
};
12 years ago
Cursor.prototype._config = function(fn, args) {
if (typeof args[args.length-1] !== 'function') return this._apply(fn, args);
args = Array.prototype.slice.call(args);
12 years ago
var callback = args.pop();
return this._apply(fn, args).toArray(callback);
14 years ago
};
12 years ago
// Proxy for the native collection prototype that normalizes method names and
// arguments to fit the mongo shell.
var Collection = function(oncollection) {
12 years ago
this._get = oncollection;
14 years ago
};
Collection.prototype.find = function() {
var args = Array.prototype.slice.call(arguments);
12 years ago
var oncollection = this._get;
12 years ago
var oncursor = thunky(function(callback) {
12 years ago
args.push(callback);
oncollection(function(err, collection) {
if (err) return callback(err);
collection.find.apply(collection, args);
});
});
14 years ago
if (typeof args[args.length-1] === 'function') {
var callback = args.pop();
12 years ago
oncursor(function(err, cursor) {
if (err) return callback(err);
cursor.toArray(callback);
});
14 years ago
}
return new Cursor(oncursor);
};
12 years ago
14 years ago
Collection.prototype.findOne = function() { // see http://www.mongodb.org/display/DOCS/Queries+and+Cursors
var args = Array.prototype.slice.call(arguments);
var callback = args.pop();
this.find.apply(this, args).limit(1).next(callback);
};
12 years ago
14 years ago
Collection.prototype.findAndModify = function(options, callback) {
12 years ago
this._apply(DRIVER_COLLECTION_PROTO.findAndModify, [options.query, options.sort || [], options.update || {}, {
new:!!options.new,
remove:!!options.remove,
14 years ago
upsert:!!options.upsert,
fields:options.fields
}, callback || noop]);
};
12 years ago
Collection.prototype.group = function(group, callback) {
this._apply(DRIVER_COLLECTION_PROTO.group, [group.key ? group.key : group.keyf, group.cond, group.initial, group.reduce, group.finalize, true, callback]);
};
12 years ago
Collection.prototype.remove = function() {
var self = this;
var callback = getCallback(arguments);
if (arguments.length > 1 && arguments[1] === true) { // the justOne parameter
this.findOne(arguments[0], function(err, doc) {
if (err || !doc) return callback(err);
self._apply(DRIVER_COLLECTION_PROTO.remove, [doc, callback]);
});
return;
}
this._apply(DRIVER_COLLECTION_PROTO.remove, arguments.length === 0 ? [{}, noop] : ensureCallback(arguments));
};
Collection.prototype.getIndexes = function() {
this._apply(DRIVER_COLLECTION_PROTO.indexes, arguments);
};
Collection.prototype.runCommand = function(cmd, opts, callback) {
callback = callback || noop;
this._get(function(err, collection) {
if (err) return callback(err);
var commandObject = {};
commandObject[cmd] = collection.collectionName;
Object.keys(opts).forEach(function(key) {
commandObject[key] = opts[key];
});
collection.db.command(commandObject, callback);
});
};
12 years ago
forEachMethod(DRIVER_COLLECTION_PROTO, Collection.prototype, function(methodName, fn) {
Collection.prototype[methodName] = function() { // we just proxy the rest of the methods directly
this._apply(fn, ensureCallback(arguments));
12 years ago
};
});
Collection.prototype._apply = function(fn, args) {
this._get(function(err, collection) {
if (err) return getCallback(args)(err);
if (!collection.opts || getCallback(args) === noop) return fn.apply(collection, args);
var safe = collection.opts.safe;
collection.opts.safe = true;
fn.apply(collection, args);
collection.opts.safe = safe;
});
};
14 years ago
var toConnectionString = function(conf) { // backwards compat config map (use a connection string instead)
12 years ago
var options = [];
var hosts = conf.replSet ? conf.replSet.members || conf.replSet : [conf];
var auth = conf.username ? (conf.username+':'+conf.password+'@') : '';
12 years ago
hosts = hosts.map(function(server) {
if (typeof server === 'string') return server;
return (server.host || '127.0.0.1') + ':' + (server.port || 27017);
}).join(',');
12 years ago
if (conf.slaveOk) options.push('slaveOk=true');
return 'mongodb://'+auth+hosts+'/'+conf.db+'?'+options.join('&');
14 years ago
};
12 years ago
var parseConfig = function(cs) {
if (typeof cs === 'object' && cs) return toConnectionString(cs);
if (typeof cs !== 'string') throw new Error('connection string required'); // to avoid undef errors on bad conf
12 years ago
cs = cs.replace(/^\//, '');
if (cs.indexOf('/') < 0) return parseConfig('127.0.0.1/'+cs);
if (cs.indexOf('mongodb://') !== 0) return parseConfig('mongodb://'+cs);
return cs;
};
var Database = function(ondb) {
EventEmitter.call(this);
this._get = ondb;
};
util.inherits(Database, EventEmitter);
Database.prototype.runCommand = function(opts, callback) {
callback = callback || noop;
this._get(function(err, db) {
if (err) return callback(err);
db.command(opts, callback);
});
};
Database.prototype.getCollectionNames = function(callback) {
this.collections(function(err, cols) {
if (err) return callback(err);
callback(cols.map(function(c) { return c.collectionName}));
});
};
Database.prototype.collection = function(name) {
var self = this;
var oncollection = thunky(function(callback) {
self._get(function(err, db) {
if (err) return callback(err);
db.collection(name, callback);
});
});
return new Collection(oncollection);
};
Database.prototype._apply = function(fn, args) {
this._get(function(err, db) {
if (err) return getCallback(args)(err);
fn.apply(db, args);
});
};
forEachMethod(DRIVER_DB_PROTO, Database.prototype, function(methodName, fn) {
Database.prototype[methodName] = function() {
this._apply(fn, arguments);
};
});
12 years ago
var connect = function(config, collections) {
var connectionString = parseConfig(config);
12 years ago
var ondb = thunky(function(callback) {
12 years ago
mongodb.Db.connect(connectionString, function(err, db) {
if (err) return callback(err);
that.client = db;
db.on('error', function(err) {
process.nextTick(function() {
that.emit('error', err);
});
});
12 years ago
callback(null, db);
});
});
var that = new Database(ondb);
12 years ago
that.bson = mongodb.BSONPure; // backwards compat
that.ObjectId = mongodb.ObjectID; // backwards compat
12 years ago
collections = collections || config.collections || [];
collections.forEach(function(colName) {
that[colName] = that.collection(colName);
});
14 years ago
return that;
};
12 years ago
connect.connect = connect; // backwards compat
12 years ago
connect.ObjectId = mongodb.ObjectID;
connect.Cursor = Cursor;
connect.Collection = Collection;
module.exports = connect;