Browse Source

mongodb-core rewrite.

saintedlama/travis-non-legacy
Eduardo Sorribas 10 years ago
parent
commit
0324967490
  1. 16
      README.md
  2. 693
      index.js
  3. 25
      lib/aggregation-cursor.js
  4. 159
      lib/bulk.js
  5. 289
      lib/collection.js
  6. 181
      lib/cursor.js
  7. 125
      lib/database.js
  8. 48
      lib/get-topology.js
  9. 15
      package.json
  10. 6
      test/insert.js
  11. 11
      test/test-aggregate.js
  12. 6
      test/test-cursor-explain.js
  13. 2
      test/test-cursor-foreach.js
  14. 8
      test/test-drop-indexes.js
  15. 2
      test/test-find-sort.js
  16. 22
      test/test-iscapped.js
  17. 45
      test/test-pass-driver-db.js
  18. 1
      test/test-update-and-callback.js
  19. 1
      test/test-update-multi.js
  20. 1
      test/test-update.js

16
README.md

@ -1,11 +1,11 @@
# mongojs
A [node.js](http://nodejs.org) module for mongodb, that emulates [the official mongodb API](http://www.mongodb.org/display/DOCS/Home) as much as possible.
It wraps [mongodb-native](https://github.com/mongodb/node-mongodb-native/) and is available through [npm](http://npmjs.org)
It wraps [mongodb-core](https://github.com/christkv/mongodb-core) and is available through [npm](http://npmjs.org)
npm install mongojs
[![Build Status](https://travis-ci.org/mafintosh/mongojs.svg?branch=master)](https://travis-ci.org/mafintosh/mongojs)
[![build status](https://secure.travis-ci.org/mafintosh/mongojs.png)](http://travis-ci.org/mafintosh/mongojs)
[![Tips](https://img.shields.io/gratipay/mafintosh.svg)](https://gratipay.com/mafintosh)
## Usage
@ -211,9 +211,8 @@ In the above example the `hackers` collection is enabled automagically (similar
## Passing a DB to the constructor
If you have an instance of the mongodb native driver, you can pass this to the constructor and mongojs will
use this instance of the driver internally. This can be useful to write modules that use mongojs without
requiring and additional connection.
If you have an instance of mongojs, you can pass this to the constructor and mongojs will use the
existing connection of that instance instead of creating a new one.
```js
var mongodb = require('mongodb');
@ -224,11 +223,6 @@ mongodb.Db.connect('mongodb://localhost/test', function(err, theDb) {
});
```
It can also be used to use a different version of the driver than the one mongojs has installed, but do this
at your own risk.
You can also do the same passing a mongojs instance instead.
# API
This API documentation is a work in progress.
@ -312,8 +306,6 @@ Get the name of the collection.
#####`cursor.rewind()`
#####`cursor.readPref(mode, tagSet)`
#####`cursor.destroy()`
#### Database

693
index.js

@ -1,688 +1,49 @@
var mongodb = require('mongodb');
var mongodb = require('mongodb-core');
var thunky = require('thunky');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var Readable = require('stream').Readable || require('readable-stream');
var PassThrough = require('stream').PassThrough || require('readable-stream').PassThrough;
var url = require('url');
var once = require('once');
var toMongodbCore = require('to-mongodb-core');
var parse = require('parse-mongo-url');
var DriverCollection = mongodb.Collection.prototype;
var DriverDb = mongodb.Db.prototype;
var Database = require('./lib/database');
var getTopology = require('./lib/get-topology');
var noop = function() {};
var ensureCallback = function(args) {
if (getCallback(args) !== noop) return args;
args = Array.prototype.slice.call(args);
args.push(noop);
return args;
};
var replaceCallback = function(args, fn) {
args = ensureCallback(args);
args[args.length - 1] = fn;
return args;
};
var getCallback = function(args) {
var callback = args[args.length-1];
return typeof callback === 'function' ? callback : noop;
};
// Proxy for the native cursor prototype that normalizes method names and
// arguments to fit the mongo shell.
var Cursor = function(oncursor) {
Readable.call(this, {objectMode:true, highWaterMark:0});
this._get = oncursor;
};
util.inherits(Cursor, Readable);
Cursor.prototype.toArray = function() {
this._apply('toArray', arguments);
};
Cursor.prototype.next = function() {
this._apply('nextObject', arguments);
};
Cursor.prototype.forEach = function() {
this._apply('each', arguments);
};
Cursor.prototype.count = function() {
this._apply('count', arguments);
};
Cursor.prototype.explain = function() {
this._apply('explain', arguments);
};
Cursor.prototype.limit = function() {
return this._config('limit', arguments);
};
Cursor.prototype.skip = function() {
return this._config('skip', arguments);
};
Cursor.prototype.batchSize = function() {
return this._config('batchSize', arguments);
};
Cursor.prototype.sort = function() {
return this._config('sort', arguments);
};
Cursor.prototype.rewind = function() {
return this._config('rewind', arguments);
};
Cursor.prototype.readPref = function() {
return this._config('setReadPreference', arguments);
};
Cursor.prototype.destroy = function() {
this._apply('close', arguments);
this.push(null);
};
Cursor.prototype.map = function(mapfn, callback) {
this.toArray(function(err, arr) {
if (err) return callback(err);
callback(null, arr.map(mapfn));
});
};
Cursor.prototype.size = function(callback) {
this.count(true, callback);
};
Cursor.prototype._apply = function(fn, args) {
this._get(function(err, cursor) {
if (err) return getCallback(args)(err);
cursor[fn].apply(cursor, args);
});
return this;
};
Cursor.prototype._read = function() { // 0.10 stream support (0.8 compat using readable-stream)
var self = this;
this.next(function(err, data) {
if (err) return self.emit('error', err);
self.push(data);
});
};
Cursor.prototype._config = function(fn, args) {
if (typeof args[args.length-1] !== 'function') return this._apply(fn, args);
args = Array.prototype.slice.call(args);
var callback = args.pop();
return this._apply(fn, args).toArray(callback);
};
var BulkFind = function(onbulkfind) {
this._get = onbulkfind;
};
BulkFind.prototype._apply = function(fn, args) {
this._get(function(err, bulkFind) {
if (err) return getCallback(args)(err);
bulkFind[fn].apply(bulkFind, args);
});
return this;
};
BulkFind.prototype.update = function() {
return this._apply('update', arguments);
};
BulkFind.prototype.updateOne = function() {
return this._apply('updateOne', arguments);
};
BulkFind.prototype.upsert = function() {
return this._apply('upsert', arguments);
};
BulkFind.prototype.remove = function() {
return this._apply('remove', arguments);
};
BulkFind.prototype.removeOne = function() {
return this._apply('removeOne', arguments);
};
BulkFind.prototype.replaceOne = function() {
return this._apply('replaceOne', arguments);
};
var Bulk = function(onbulk) {
this._get = onbulk;
};
Bulk.prototype._apply = function(fn, args) {
this._get(function(err, bulk) {
if (err) return getCallback(args)(err);
bulk[fn].apply(bulk, args);
});
return this;
};
Bulk.prototype.insert = function() {
return this._apply('insert', arguments);
};
Bulk.prototype.execute = function(callback) {
if (!callback) return this.execute(noop);
return this._apply('execute', arguments);
};
Bulk.prototype.find = function() {
var self = this;
var args = arguments;
var onbulkfind = thunky(function(callback) {
self._get(function(err, bulk) {
callback(err, bulk.find.apply(bulk, args))
});
});
return new BulkFind(onbulkfind);
};
// Proxy for the native collection prototype that normalizes method names and
// arguments to fit the mongo shell.
var Collection = function(name, oncollection) {
this._get = oncollection;
this._name = name;
};
Collection.prototype.aggregate = function() {
var args = Array.prototype.slice.call(arguments);
if (typeof args[args.length-1] === 'function') {
return this._apply(DriverCollection.aggregate, ensureCallback(arguments));
}
args.push({cursor: {batchSize: 1}});
var pt = new PassThrough({objectMode: true, highWaterMark: 16});
this._get(function(err, collection) {
if (err) return callback(err);
collection.aggregate.apply(collection, args).pipe(pt);
});
return pt;
};
Collection.prototype.count = function() {
return this._apply(DriverCollection.count, arguments);
};
Collection.prototype.createIndex = function() {
return this._apply(DriverCollection.createIndex, ensureCallback(arguments));
};
Collection.prototype.distinct = function() {
return this._apply(DriverCollection.distinct, arguments);
};
Collection.prototype.drop = function() {
return this._apply(DriverCollection.drop, ensureCallback(arguments));
};
Collection.prototype.dropIndex = function() {
return this._apply(DriverCollection.dropIndex, ensureCallback(arguments));
};
Collection.prototype.dropIndexes = function(callback) {
this.runCommand('dropIndexes', {index: '*'}, callback);
};
Collection.prototype.ensureIndex = function() {
return this._apply(DriverCollection.ensureIndex, ensureCallback(arguments));
};
Collection.prototype.isCapped = function() {
return this._apply(DriverCollection.isCapped, arguments);
};
Collection.prototype.mapReduce = function() {
return this._apply(DriverCollection.mapReduce, ensureCallback(arguments));
};
Collection.prototype.reIndex = function() {
return this._apply(DriverCollection.reIndex, ensureCallback(arguments));
};
Collection.prototype.stats = function() {
return this._apply(DriverCollection.stats, arguments);
};
Collection.prototype.find = function() {
var args = Array.prototype.slice.call(arguments);
var oncollection = this._get;
var oncursor = thunky(function(callback) {
args.push(callback);
oncollection(function(err, collection) {
if (err) return callback(err);
collection.find.apply(collection, args);
});
});
if (typeof args[args.length-1] === 'function') {
var callback = args.pop();
oncursor(function(err, cursor) {
if (err) return callback(err);
cursor.toArray(callback);
});
}
return new Cursor(oncursor);
var getDbName = function(connString) {
if (typeof connString !== 'string') return null;
var config = parse(connString);
return config.dbName;
};
Collection.prototype.findOne = function() { // see http://www.mongodb.org/display/DOCS/Queries+and+Cursors
var args = Array.prototype.slice.call(arguments);
var callback = args.pop();
this.find.apply(this, args).limit(1).next(callback);
};
Collection.prototype.findAndModify = function(options, callback) {
this._apply(DriverCollection.findAndModify, [options.query, options.sort || [], options.update || {}, {
new:!!options.new,
remove:!!options.remove,
upsert:!!options.upsert,
fields:options.fields
}, function(err, doc, obj) {
// If the findAndModify command returns an error, obj is undefined and the lastErrorObject
// property is added to the err argument instead.
// If the findAndModify command finds no matching document, when performing update or remove,
// no lastErrorObject is included (so we fake it).
(callback || noop)(err, doc, (obj && obj.lastErrorObject) || { n: 0 });
}]);
};
Collection.prototype.group = function(group, callback) {
this._apply(DriverCollection.group, [
group.key ? group.key : group.keyf,
group.cond,
group.initial,
group.reduce,
group.finalize,
true,
callback
]);
};
Collection.prototype.remove = function() {
var self = this;
var fn = getCallback(arguments);
var callback = function(err, count) {
if (err) return fn(err);
fn(err, { n: count });
};
if (arguments.length > 1 && arguments[1] === true) { // the justOne parameter
this.findOne(arguments[0], function(err, doc) {
if (err) return callback(err);
if (!doc) return callback(null, {n : 0});
self._apply(DriverCollection.remove, [doc, callback]);
module.exports = function(connString, cols) {
var dbname = getDbName(connString);
var onserver = thunky(function(cb) {
getTopology(connString, function(err, topology) {
if (err) return cb(err);
cb(null, topology);
});
return;
}
this._apply(DriverCollection.remove, arguments.length === 0 ? [{}, noop] : replaceCallback(arguments, callback));
};
Collection.prototype.insert = function() {
var args = arguments;
var fn = getCallback(arguments);
var callback = function(err, docs) {
if (err) return fn(err);
if (Array.isArray(args[0])) {
fn(err, docs, { n : 0});
} else {
fn(err, docs[0], { n : 0});
}
};
this._apply(DriverCollection.insert, replaceCallback(arguments, callback));
};
Collection.prototype.save = function() {
var self = this;
var args = arguments;
var fn = getCallback(arguments);
var callback = function(err, doc, lastErrorObject) {
if (err) return fn(err);
if (doc === 1) {
fn(err, args[0], lastErrorObject);
} else {
// The third parameter is a faked lastErrorObject
fn(err, doc, { n : 0});
}
};
this._apply(DriverCollection.save, replaceCallback(arguments, callback));
};
Collection.prototype.update = function() {
var fn = getCallback(arguments);
var callback = function(err, count, lastErrorObject) {
fn(err, lastErrorObject);
};
this._apply(DriverCollection.update, replaceCallback(arguments, callback));
};
Collection.prototype.getIndexes = function() {
this._apply(DriverCollection.indexes, arguments);
};
Collection.prototype.runCommand = function(cmd, opts, callback) {
callback = callback || noop;
opts = opts || {};
if (typeof opts === 'function') {
callback = opts;
}
this._get(function(err, collection) {
if (err) return callback(err);
var commandObject = {};
commandObject[cmd] = collection.collectionName;
Object.keys(opts).forEach(function(key) {
commandObject[key] = opts[key];
});
collection.db.command(commandObject, callback);
});
};
Collection.prototype.initializeUnorderedBulkOp = function() {
return this._initializeBulkOp(false);
};
Collection.prototype.initializeOrderedBulkOp = function() {
return this._initializeBulkOp(true);
};
Collection.prototype._initializeBulkOp = function(ordered) {
var self = this;
var onbulk = thunky(function(callback) {
self._get(function(err, collection) {
var fn = ordered ? collection.initializeOrderedBulkOp : collection.initializeUnorderedBulkOp;
callback(err, fn.call(collection));
})
});
return new Bulk(onbulk);
};
Collection.prototype.toString = function() {
return this._name;
};
Collection.prototype._apply = function(fn, args) {
this._get(function(err, collection) {
if (err) return getCallback(args)(err);
if (!collection.opts || getCallback(args) === noop) return fn.apply(collection, args);
var safe = collection.opts.safe;
collection.opts.safe = true;
fn.apply(collection, args);
collection.opts.safe = safe;
});
};
var toConnectionString = function(conf) { // backwards compat config map (use a connection string instead)
var options = [];
var hosts = conf.replSet ? conf.replSet.members || conf.replSet : [conf];
var auth = conf.username ? (conf.username+':'+conf.password+'@') : '';
hosts = hosts.map(function(server) {
if (typeof server === 'string') return server;
return (server.host || '127.0.0.1') + ':' + (server.port || 27017);
}).join(',');
if (conf.slaveOk) options.push('slaveOk=true');
return 'mongodb://'+auth+hosts+'/'+conf.db+'?'+options.join('&');
};
var parseConfig = function(cs) {
if (typeof cs === 'object' && cs) return toConnectionString(cs);
if (typeof cs !== 'string') throw new Error('connection string required'); // to avoid undef errors on bad conf
cs = cs.replace(/^\//, '');
if (cs.indexOf('/') < 0) return parseConfig('127.0.0.1/'+cs);
if (cs.indexOf('mongodb://') !== 0) return parseConfig('mongodb://'+cs);
return cs;
};
var Database = function(name, ondb) {
EventEmitter.call(this);
this._get = ondb;
this._name = name;
if (!this._name) {
var self = this;
this._get(function(err, db) {
self._name = db.databaseName;
});
}
};
util.inherits(Database, EventEmitter);
Database.prototype.close = function() {
return this._apply(DriverDb.close, ensureCallback(arguments));
};
Database.prototype.addUser = function() {
return this._apply(DriverDb.addUser, ensureCallback(arguments));
};
Database.prototype.dropDatabase = function() {
return this._apply(DriverDb.dropDatabase, ensureCallback(arguments));
};
Database.prototype.eval = function() {
return this._apply(DriverDb.eval, ensureCallback(arguments));
};
Database.prototype.removeUser = function() {
return this._apply(DriverDb.removeUser, ensureCallback(arguments));
};
Database.prototype.stats = function() {
return this._apply(DriverDb.stats, arguments);
};
Database.prototype.runCommand = function(opts, callback) {
callback = callback || noop;
if (typeof opts === 'string') {
var tmp = opts;
opts = {};
opts[tmp] = 1;
}
this._get(function(err, db) {
if (err) return callback(err);
if (opts.shutdown === undefined) return db.command(opts, callback);
// If the command in question is a shutdown, mongojs should shut down the server without crashing.
db.command(opts, function(err) {
db.close();
callback.apply(this, arguments);
});
});
};
Database.prototype.open = function(callback) {
this._get(callback); // a way to force open the db, 99.9% of the times this is not needed
};
Database.prototype.getCollectionNames = function(callback) {
this._get(function(err, db) {
db.collections(function(err, cols) {
if (err) return callback(err);
callback(null, cols.map(function(c) {
return c.collectionName;
}));
});
});
};
Database.prototype.createCollection = function(name, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
opts = opts || {};
opts.strict = opts.strict !== false;
this._apply(DriverDb.createCollection, [name, opts, callback || noop]);
};
Database.prototype.collection = function(name) {
var self = this;
var oncollection = thunky(function(callback) {
self._get(function(err, db) {
if (err) return callback(err);
db.collection(name, callback);
});
});
return new Collection(this._name+'.'+name, oncollection);
};
Database.prototype.toString = function() {
return this._name;
};
Database.prototype.getLastError = function (callback) {
this.runCommand('getLastError', function(err, res) {
if (err) return callback(err);
callback(err, res.err);
});
};
Database.prototype.getLastErrorObj = function (callback) {
this.runCommand('getLastError', callback);
};
Database.prototype._apply = function(fn, args) {
this._get(function(err, db) {
if (err) return getCallback(args)(err);
fn.apply(db, args);
});
};
var isDriverDb = function(db) {
var result = true;
result = result && typeof db.admin === 'function';
result = result && typeof db.close === 'function';
result = result && typeof db.collection === 'function';
result = result && typeof db.collectionNames === 'function';
result = result && typeof db._get === 'undefined';
return result;
};
var isMongojsDb = function(db) {
var result = true;
result = result && typeof db.close === 'function';
result = result && typeof db.collection === 'function';
result = result && typeof db.getCollectionNames === 'function';
result = result && typeof db._get === 'function';
return result;
};
var connect = function(config, collections) {
if (isDriverDb(config)) {
var driverDb = config;
} else if (isMongojsDb(config)) {
var mongojsDb = config;
} else {
var connectionString = parseConfig(config);
var dbName = (connectionString.match(/\/([^\/\?]+)(\?|$)/) || [])[1] || 'db';
}
var ondb = thunky(function(callback) {
mongodb.Db.connect(connectionString, function(err, db) {
if (err) {
that.emit('error', err);
return callback(err);
}
that.client = db;
that.emit('ready');
db.on('error', function(err) {
process.nextTick(function() {
that.emit('error', err);
});
if (!dbname) {
dbname = connString._dbname;
onserver = thunky(function(cb) {
toMongodbCore(connString, function(err, server) {
if (err) cb(new Error('You must pass a connection string or a mongojs instance.'));
cb(null, server);
});
callback(null, db);
});
});
if (driverDb) {
ondb = function(callback) {
callback(null, driverDb);
};
}
if (mongojsDb) {
ondb = function(callback) {
mongojsDb.open(function(err, db) {
if (err) callback(err);
callback(null, db);
});
}
}
var that = new Database(dbName, ondb);
that.bson = mongodb.BSONPure; // backwards compat (require('bson') instead)
that.ObjectId = mongodb.ObjectID; // backwards compat
collections = collections || config.collections || [];
collections.forEach(function(colName) {
var parts = colName.split('.');
var last = parts.pop();
var parent = parts.reduce(function(parent, prefix) {
return parent[prefix] = parent[prefix] || {};
}, that);
parent[last] = that.collection(colName);
});
var that = new Database({name: dbname, cols: cols}, onserver);
if (typeof Proxy !== 'undefined') {
var p = Proxy.create({
get: function(obj, prop) {
if (!that[prop]) that[prop] = that.collection(prop);
if (that[prop]) return that[prop];
that[prop] = that.collection(prop);
return that[prop];
}
});
return p;
}
};
return that;
};
connect.connect = connect; // backwards compat
// expose bson stuff visible in the shell
connect.ObjectId = mongodb.ObjectID;
connect.DBRef = mongodb.DBRef;
connect.Timestamp = mongodb.Timestamp;
connect.MinKey = mongodb.MinKey;
connect.MaxKey = mongodb.MaxKey;
connect.NumberLong = mongodb.Long;
connect.Cursor = Cursor;
connect.Collection = Collection;
connect.Database = Database;
module.exports = connect;

25
lib/aggregation-cursor.js

@ -0,0 +1,25 @@
var util = require('util');
var thunky = require('thunky');
var Cursor = require('./cursor');
var Readable = require('stream').Readable || require('readable-stream');
var AggregationCursor = function(opts) {
Cursor.call(this, opts);
var onserver = this._opts.onserver;
var self = this;
this._get = thunky(function(cb) {
onserver(function(err, server) {
if (err) return cb(err);
cb(null, server.cursor(self._opts.fullCollectionName, {
aggregate: self._opts.colName,
pipeline: self._opts.pipeline,
cursor: {batchSize: 1000}
}, {cursor: {batchSize: 1000}}));
});
});
};
util.inherits(AggregationCursor, Cursor);
module.exports = AggregationCursor;

159
lib/bulk.js

@ -0,0 +1,159 @@
var mongodb = require('mongodb-core');
var each = require('each-series');
var oid = mongodb.BSON.ObjectID.createPk;
var Bulk = function(colName, ordered, onserver, dbname) {
this._colname = colName;
this._cmds = [];
this._currCmd = null;
this._ordered = ordered;
this._onserver = onserver;
this._dbname = dbname;
var self = this;
this.find = function(query) {
var findobj = {};
var remove = function(lim) {
if (!self._currCmd) {
self._currCmd = {
delete: self._colname,
deletes: [],
ordered: self._ordered,
writeConcern: {w: 1}
};
}
if (!self._currCmd.delete) {
self._cmds.push(self._currCmd);
self._currCmd = {
delete: self._colname,
deletes: [],
ordered: self._ordered,
writeConcern: {w: 1}
};
}
self._currCmd.deletes.push({q: query, limit: lim});
};
var update = function(updObj, multi) {
if (!self._currCmd) {
self._currCmd = {
update: self._colname,
updates: [],
ordered: self._ordered,
writeConcern: {w: 1}
};
}
if (!self._currCmd.update) {
self._cmds.push(self._currCmd);
self._currCmd = {
update: self._colname,
updates: [],
ordered: self._ordered,
writeConcern: {w: 1}
};
}
self._currCmd.updates.push({q: query, u: updObj, multi: multi, upsert: false});
};
findobj.remove = function() {
remove(0);
};
findobj.removeOne = function() {
remove(1);
};
findobj.update = function(updObj) {
update(updObj, true);
};
findobj.updateOne = function(updObj) {
update(updObj, false);
};
return findobj;
};
};
Bulk.prototype.insert = function(doc) {
if (!this._currCmd) {
this._currCmd = {
insert: this._colname,
documents: [],
ordered: this._ordered,
writeConcern: {w: 1}
};
}
if (!this._currCmd.insert) {
this._cmds.push(this._currCmd);
this._currCmd = {
insert: this._colname,
documents: [],
ordered: this._ordered,
writeConcern: {w: 1}
};
}
if (!doc._id) doc._id = oid();
this._currCmd.documents.push(doc);
};
var cmdkeys = {
insert: 'nInserted',
delete: 'nRemoved',
update: 'nUpserted'
};
Bulk.prototype.tojson = function() {
var obj = {
nInsertOps: 0,
nUpdateOps: 0,
nRemoveOps: 0,
nBatches: this._cmds.length
};
this._cmds.forEach(function(cmd) {
if (cmd.update) {
obj.nUpdateOps += cmd.updates.length;
} else if (cmd.insert) {
obj.nInsertOps += cmd.documents.length;
} else if (cmd.delete) {
obj.nRemoveOps += cmd.deletes.length;
}
});
return obj;
};
Bulk.prototype.execute = function(cb) {
var self = this;
var result = {
writeErrors : [ ],
writeConcernErrors : [ ],
nInserted : 0,
nUpserted : 0,
nMatched : 0,
nModified : 0,
nRemoved : 0,
upserted : [ ]
};
this._cmds.push(this._currCmd);
this._onserver(function(err, server) {
if (err) return cb(err);
each(self._cmds, function(cmd, i, done) {
server.command(self._dbname + '.$cmd', cmd, function(err, res) {
if (err) return done(err);
result[cmdkeys[Object.keys(cmd)[0]]] += res.result.n;
done();
});
}, function(err) {
if (err) return cb(err);
result.ok = 1;
cb(null, result);
});
});
};
module.exports = Bulk;

289
lib/collection.js

@ -0,0 +1,289 @@
var mongodb = require('mongodb-core');
var thunky = require('thunky');
var concat = require('concat-stream');
var pump = require('pump');
var once = require('once');
var PassThrough = require('stream').PassThrough || require('readable-stream').PassThrough;
var Cursor = require('./cursor');
var AggregationCursor = require('./aggregation-cursor');
var Bulk = require('./bulk');
var writeOpts = {writeConcern: {w:1}, ordered:true};
var noop = function() {};
var oid = mongodb.BSON.ObjectID.createPk;
var Code = mongodb.BSON.Code;
var indexName = function(index) {
return Object.keys(index).map(function(key) {
return key + '_' + index[key];
}).join('_');
};
var Collection = function(opts, getServer) {
this._name = opts.name;
this._dbname = opts.dbname;
this._getServer = getServer;
};
Collection.prototype._fullColName = function() {
return this._dbname + '.' + this._name
};
Collection.prototype.find = function(query, projection, cb) {
if (typeof query === 'function') return this.find({}, null, query);
if (typeof projection === 'function') return this.find(query, null, projection);
var cursor = new Cursor({
query: query,
projection: projection,
onserver: this._getServer,
fullCollectionName: this._fullColName()
});
if (cb) return cursor.toArray(cb);
return cursor;
};
Collection.prototype.findOne = function(query, projection, cb) {
if (typeof query === 'function') return this.findOne({}, null, query);
if (typeof projection === 'function') return this.findOne(query, null, projection);
this.find(query, projection).next(function(err, doc) {
if (err) return cb(err);
cb(null, doc);
});
};
Collection.prototype.findAndModify = function(opts, cb) {
this.runCommand('findAndModify', opts, function(err, result) {
if (err) return cb(err);
cb(null, result.value, result.lastErrorObject || {n: 0});
});
};
Collection.prototype.count = function(query, cb) {
if (typeof query === 'function') return this.count({}, query);
this.find(query).count(cb);
};
Collection.prototype.distinct = function(field, query, cb) {
this.runCommand('distinct', {key: field, query: query}, function(err, result) {
if (err) return cb(err);
cb(null, result.values);
});
};
Collection.prototype.insert = function(docOrDocs, cb) {
cb = cb || noop;
var self = this;
this._getServer(function(err, server) {
if (err) return cb(err);
var docs = Array.isArray(docOrDocs) ? docOrDocs: [docOrDocs];
for (var i = 0; i < docs.length; i++) {
if (!docs[i]._id) docs[i]._id = oid();
}
server.insert(self._fullColName(), docs, writeOpts, function(err, res) {
if (err) return cb(err);
cb(null, docOrDocs);
});
});
};
Collection.prototype.update = function(query, update, opts, cb) {
if (!opts && !cb) return this.update(query, update, {}, noop);
if (typeof opts === 'function') return this.update(query, update, {}, opts);
cb = cb || noop;
var self = this;
this._getServer(function(err, server) {
if (err) return cb(err);
opts.q = query;
opts.u = update;
server.update(self._fullColName(), [opts], writeOpts, function(err, res) {
if (err) return cb(err);
cb(null, res.result);
});
});
};
Collection.prototype.save = function(doc, cb) {
cb = cb || noop;
if (doc._id) {
this.update({_id: doc._id}, doc, {upsert: true}, function(err, result) {
if (err) return cb(err);
cb(null, doc);
});
} else {
this.insert(doc, cb);
}
};
Collection.prototype.remove = function(query, justOne, cb) {
if (typeof query === 'function') return this.remove({}, false, query);
if (typeof justOne === 'function') return this.remove(query, false, justOne);
var self = this;
this._getServer(function(err, server) {
if (err) return cb(err);
server.remove(self._fullColName(), [{q: query, limit: justOne ? 1 : 0}], writeOpts, function(err, res) {
if (err) return cb(err);
cb(null, res.result)
});
});
};
Collection.prototype.drop = function(cb) {
this.runCommand('drop', cb);
};
Collection.prototype.stats = function(cb) {
this.runCommand('collStats', cb);
};
Collection.prototype.mapReduce = function(map, reduce, opts, cb) {
this.runCommand('mapReduce', {
map: map.toString(),
reduce: reduce.toString(),
query: opts.query || {},
out: opts.out
}, cb);
};
Collection.prototype.runCommand = function(cmd, opts, cb) {
if (typeof opts === 'function') return this.runCommand(cmd, null, opts);
var self = this;
opts = opts || {};
var cmdObject = {};
cmdObject[cmd] = this._name;
Object.keys(opts).forEach(function(key) {
cmdObject[key] = opts[key];
});
this._getServer(function(err, server) {
if (err) return cb(err);
server.command(self._dbname + '.$cmd', cmdObject, function(err, result) {
if (err) return cb(err);
cb(null, result.result);
});
});
};
Collection.prototype.toString = function() {
return this._name;
};
Collection.prototype.dropIndexes = function(cb) {
this.runCommand('dropIndexes', {index: '*'}, cb);
};
Collection.prototype.dropIndex = function(index, cb) {
this.runCommand('dropIndexes', {index: index}, cb);
};
Collection.prototype.createIndex = function(index, opts, cb) {
if (typeof opts === 'function') return this.createIndex(index, {}, opts);
if (typeof opts === 'undefined') return this.createIndex(index, {}, noop);
opts.name = indexName(index);
opts.key = index;
this.runCommand('createIndexes', {indexes: [opts]}, cb);
};
Collection.prototype.ensureIndex = function(index, opts, cb) {
this.createIndex(index, opts, cb);
};
Collection.prototype.getIndexes = function(cb) {
var cursor = new Cursor({
query: {ns: this._fullColName()},
projection: {},
onserver: this._getServer,
fullCollectionName: this._dbname + '.system.indexes'
});
cursor.toArray(cb);
};
Collection.prototype.reIndex = function(cb) {
this.runCommand('reIndex', cb);
};
Collection.prototype.isCapped = function(cb) {
var cursor = new Cursor({
query: {name: this._fullColName()},
projection: {},
onserver: this._getServer,
fullCollectionName: this._dbname + '.system.namespaces'
});
cursor.toArray(function(err, cols) {
if (err) return cb(err);
cb(null, (cols[0].options && cols[0].options.capped) || false);
});
};
Collection.prototype.stats = function(cb) {
this.runCommand('collStats', cb);
};
Collection.prototype.group = function(doc, cb) {
var obj = {};
var cmd = {
group: {
ns: this._name,
key: doc.key,
initial: doc.initial,
$reduce: new Code(doc.reduce.toString()),
out: 'inline',
cond: doc.cond
}
};
if (doc.finalize) cmd.group['finalize'] = new Code(doc.finalize.toString());
if (doc.keys) cmd.group.$keyf = new Code(doc.keys.toString());
var self = this;
this._getServer(function(err, server) {
if (err) return cb(err);
server.command(self._dbname + '.$cmd', cmd, function(err, result) {
if (err) return cb(err);
cb(null, result.result.retval);
});
});
};
Collection.prototype.aggregate = function() {
var cb;
var pipeline = Array.prototype.slice.call(arguments);
if (typeof pipeline[pipeline.length - 1] === 'function') {
cb = once(pipeline.pop());
}
if (cb) {
this.runCommand('aggregate', {pipeline: pipeline}, function(err, res) {
if (err) return cb(err);
cb(null, res.result);
});
return;
}
var strm = new AggregationCursor({
onserver: this._getServer,
colName: this._name,
fullCollectionName: this._fullColName(),
pipeline: pipeline
});
return strm;
};
Collection.prototype.initializeOrderedBulkOp = function() {
return new Bulk(this._name, true, this._getServer, this._dbname);
};
Collection.prototype.initializeUnorderedBulkOp = function() {
return new Bulk(this._name, false, this._getServer, this._dbname);
};
module.exports = Collection;

181
lib/cursor.js

@ -0,0 +1,181 @@
var util = require('util');
var pump = require('pump');
var concat = require('concat-stream');
var thunky = require('thunky');
var Readable = require('stream').Readable || require('readable-stream');
var noop = function() {};
var getCallback = function(args) {
var callback = args[args.length-1];
return typeof callback === 'function' ? callback : noop;
};
var Cursor = function(opts) {
Readable.call(this, {objectMode:true, highWaterMark:0});
this._opts = opts;
var onserver = this._opts.onserver;
var self = this;
this._get = thunky(function(cb) {
onserver(function(err, server) {
if (err) return cb(err);
cb(null, server.cursor(self._opts.fullCollectionName, {
find: self._opts.fullCollectionName,
query: self._opts.query || {},
fields: self._opts.projection,
sort: self._opts.sort,
skip: self._opts.skip,
limit: self._opts.limit,
batchSize: self._opts.batchSize,
explain: self._opts.explain
}));
});
});
};
util.inherits(Cursor, Readable);
Cursor.prototype.next = function(cb) {
return this._apply('next', arguments);
};
Cursor.prototype.rewind = function(cb) {
return this._apply('rewind', arguments);
};
Cursor.prototype.toArray = function(cb) {
var array = [];
var self = this;
var loop = function() {
self.next(function(err, obj) {
if (err) return cb(err);
if (!obj) return cb(null, array);
array.push(obj);
loop();
});
};
loop();
};
Cursor.prototype.map = function(mapfn, cb) {
var array = [];
var self = this;
var loop = function() {
self.next(function(err, obj) {
if (err) return cb(err);
if (!obj) return cb(null, array);
array.push(mapfn(obj));
loop();
});
};
loop();
};
Cursor.prototype.forEach = function(fn) {
var array = [];
var self = this;
var loop = function() {
self.next(function(err, obj) {
fn(err, obj);
loop();
});
};
loop();
};
Cursor.prototype.limit = function(n, cb) {
this._opts.limit = n;
if (cb) return this.toArray(cb);
return this;
};
Cursor.prototype.skip = function(n, cb) {
this._opts.skip = n;
if (cb) return this.toArray(cb);
return this;
};
Cursor.prototype.batchSize = function(n, cb) {
this._opts.batchSize = n;
if (cb) return this.toArray(cb);
return this;
};
Cursor.prototype.sort = function(sortObj, cb) {
this._opts.sort = sortObj;
if (cb) return this.toArray(cb);
return this;
};
Cursor.prototype.count = function(cb) {
var self = this;
var onserver = this._opts.onserver;
var dbname = this._opts.fullCollectionName.split('.')[0];
var colname = this._opts.fullCollectionName.split('.')[1];
onserver(function(err, server) {
if (err) return cb(err);
server.command(dbname + '.$cmd', {count: colname, query: self._opts.query}, function(err, result) {
if (err) return cb(err);
cb(null, result.result.n);
});
});
};
Cursor.prototype.size = function(cb) {
var self = this;
var onserver = this._opts.onserver;
var dbname = this._opts.fullCollectionName.split('.')[0];
var colname = this._opts.fullCollectionName.split('.')[1];
onserver(function(err, server) {
if (err) return cb(err);
var cmd = {count: colname};
if (self._opts.query) cmd.query = self._opts.query;
if (self._opts.limit) cmd.limit = self._opts.limit;
if (self._opts.skip) cmd.skip = self._opts.skip;
server.command(dbname + '.$cmd', cmd, function(err, result) {
if (err) return cb(err);
cb(null, result.result.n);
});
});
};
Cursor.prototype.explain = function(cb) {
var q = this._opts.query || {};
this._opts.query = {$query: q, $explain: 1};
this.next(cb);
};
Cursor.prototype.destroy = function() {
this._get(function(err, cursor) {
if (!cursor.close) return;
cursor.close();
});
};
Cursor.prototype._read = function() {
var self = this;
this.next(function(err, data) {
if (err) return self.emit('error', err);
self.push(data);
});
};
Cursor.prototype._apply = function(fn, args) {
this._get(function(err, cursor) {
if (err) return getCallback(args)(err);
cursor[fn].apply(cursor, args);
});
return this;
};
module.exports = Cursor;

125
lib/database.js

@ -0,0 +1,125 @@
var Collection = require('./collection');
var bson = require('mongodb-core').BSON;
var xtend = require('xtend');
var noop = function() {};
var Database = function(opts, onserver) {
this._getServer = onserver;
this._dbname = opts.name;
var self = this;
this.ObjectId = bson.ObjectId;
opts.cols = opts.cols || [];
opts.cols.forEach(function(colName) {
self[colName] = self.collection(colName);
var parts = colName.split('.');
var last = parts.pop();
var parent = parts.reduce(function(parent, prefix) {
return parent[prefix] = parent[prefix] || {};
}, self);
parent[last] = self.collection(colName);
});
};
Database.prototype.collection = function(colName) {
return new Collection({name: colName, dbname: this._dbname}, this._getServer);
};
Database.prototype.close = function(cb) {
cb = cb || noop;
this._getServer(function(err, server) {
if (err) return cb(err);
server.destroy();
cb();
});
};
Database.prototype.runCommand = function(opts, cb) {
cb = cb || noop;
if (typeof opts === 'string') {
var tmp = opts;
opts = {};
opts[tmp] = 1;
}
var self = this;
this._getServer(function(err, server) {
if (err) return cb(err);
server.command(self._dbname + '.$cmd', opts, function(err, result) {
if (err) return cb(err);
cb(null, result.result);
});
});
};
Database.prototype.getCollectionNames = function(cb) {
this.collection('system.namespaces').find({name:/^((?!\$).)*$/}, function(err, cols) {
if (err) return cb(err);
cb(null, cols.map(function(col) {
return col.name.split('.').splice(1).join('.');
}));
});
};
Database.prototype.createCollection = function(name, opts, cb) {
if (typeof opts === 'function') return this.createCollection(name, {}, opts);
var cmd = {create: name};
Object.keys(opts).forEach(function(opt) {
cmd[opt] = opts[opt];
});
this.runCommand(cmd, cb);
};
Database.prototype.stats = function(scale, cb) {
if (typeof scale === 'function') return this.stats(1, scale);
this.runCommand({dbStats:1, scale: scale}, cb);
};
Database.prototype.dropDatabase = function(cb) {
this.runCommand('dropDatabase', cb);
};
Database.prototype.createUser = function(usr, cb) {
var cmd = xtend({createUser: usr.user}, usr);
delete cmd.user;
this.runCommand(cmd, cb);
};
Database.prototype.addUser = Database.prototype.createUser;
Database.prototype.dropUser = function(username, cb) {
this.runCommand({dropUser: username}, cb);
};
Database.prototype.removeUser = Database.prototype.dropUser;
Database.prototype.eval = function(fn) {
var cb = arguments[arguments.length - 1];
this.runCommand({
eval: fn.toString(),
args: Array.prototype.slice.call(arguments, 1, arguments.length - 1)
}, function(err, res) {
if (err) return cb(err);
cb(null, res.retval);
})
};
Database.prototype.getLastErrorObj = function(cb) {
this.runCommand('getLastError', cb);
};
Database.prototype.getLastError = function(cb) {
this.runCommand('getLastError', function(err, res) {
if (err) return cb(err);
cb(null, res.err);
});
};
Database.prototype.toString = function() {
return this._dbname;
};
module.exports = Database;

48
lib/get-topology.js

@ -0,0 +1,48 @@
var once = require('once');
var parse = require('parse-mongo-url');
var mongodb = require('mongodb-core');
var Server = mongodb.Server;
var ReplSet = mongodb.ReplSet;
var MongoCR = mongodb.MongoCR;
module.exports = function(connString, cb) {
cb = once(cb);
var config = parse(connString);
var srv;
if (config.servers.length === 1) {
var opts = config.server_options;
opts.host = config.servers[0].host || 'localhost';
opts.port = config.servers[0].port || 27017;
opts.reconnect = true;
opts.reconnectInterval = 50;
srv = new Server(opts);
} else {
var rsopts = config.rs_options;
rsopts.setName = rsopts.rs_name
rsopts.reconnect = true;
rsopts.reconnectInterval = 50;
srv = new ReplSet(config.servers, rsopts);
}
if (config.auth) {
srv.addAuthProvider('mongocr', new MongoCR());
srv.on('connect', function(server) {
server.auth('mongocr', config.dbName, config.auth.user, config.auth.password, function(err, r) {
if (err) return cb(err);
cb(null, r);
});
});
} else {
srv.on('connect', function(server) {
cb(null, server);
});
}
srv.on('error', function(err) {
cb(err);
});
srv.connect();
};

15
package.json

@ -25,16 +25,21 @@
"Kevin McTigue"
],
"dependencies": {
"thunky": "~0.1.0",
"concat-stream": "^1.4.6",
"each-series": "^0.2.0",
"mongodb-core": "1.1.9",
"once": "^1.3.1",
"parse-mongo-url": "^1.1.0",
"pump": "^1.0.0",
"readable-stream": "~1.1.9",
"mongodb": "1.4.32"
"thunky": "~0.1.0",
"to-mongodb-core": "^2.0.0",
"xtend": "^4.0.0"
},
"scripts": {
"test": "tape test/test-*.js; echo \"Harmony tests\"; node --harmony node_modules/tape/bin/tape test/test-*.js"
},
"devDependencies": {
"concat-stream": "^1.4.6",
"each-series": "^0.2.0",
"tape": "^2.13.4"
"tape": "^3.4.0"
}
}

6
test/insert.js

@ -20,3 +20,9 @@ module.exports = function(testName, docs, testFn) {
});
});
};
module.exports.skip = function(testName) {
test.skip(testName, function(t) {
t.end();
});
};

11
test/test-aggregate.js

@ -11,18 +11,27 @@ insert('aggregate', [{
name:'Lapras' , type:'water'
}], function(db, t, done) {
db.a.aggregate({$group: {_id: '$type'}}, function(err, types) {
console.log(err, types);
var arr = types.map(function(x) {return x._id});
t.equal(types.length, 2);
t.notEqual(arr.indexOf('fire'), -1);
t.notEqual(arr.indexOf('water'), -1);
// test as a stream
db.a.aggregate({$group: {_id: '$type'}}).pipe(concat(function(types) {
var strm = db.a.aggregate({$group: {_id: '$type'}})
strm.pipe(concat(function(types) {
var arr = types.map(function(x) {return x._id});
t.equal(types.length, 2);
t.notEqual(arr.indexOf('fire'), -1);
t.notEqual(arr.indexOf('water'), -1);
t.end();
}));
strm.on('error', function(err) {
// Aggregation cursors are only supported on mongodb 2.6+
// this shouldn't fail the tests for other versions of mongodb
if (err.message === 'unrecognized field "cursor') t.ok(1);
else t.fail(err);
t.end();
});
});
});

6
test/test-cursor-explain.js

@ -8,7 +8,11 @@ insert('cursor.explain', [{
}], function(db, t, done) {
var cursor = db.a.find();
cursor.explain(function(err, result) {
t.equal(result.nscannedObjects, 2);
if (result.executionStats) {
t.equal(result.executionStats.totalDocsExamined, 2);
} else {
t.equal(result.nscannedObjects, 2);
}
done();
});
});

2
test/test-cursor-foreach.js

@ -8,7 +8,7 @@ var pokemons = [{
name:'Lapras' , type:'water'
}];
insert('remove', pokemons, function(db, t, done) {
insert('cursor foreach', pokemons, function(db, t, done) {
var i = 0;
db.a.find().forEach(function(err, pkm) {
if (++i === 4) return t.end();

8
test/test-drop-indexes.js

@ -1,7 +1,7 @@
var insert = require('./insert');
var concat = require('concat-stream');
insert('aggregate', [{
insert('drop indexes', [{
name:'Squirtle', type:'water'
}, {
name:'Starmie' , type:'water'
@ -11,6 +11,12 @@ insert('aggregate', [{
name:'Lapras' , type:'water'
}], function(db, t, done) {
db.a.ensureIndex({type: 1}, function(err) {
if (err && err.message === 'no such cmd: createIndexes') {
// Index creation and deletion not supported for mongodb 2.4 and lower.
t.ok(true);
t.end();
return;
}
t.ok(!err);
db.a.getIndexes(function(err, indexes) {
t.ok(!err);

2
test/test-find-sort.js

@ -1,7 +1,7 @@
var insert = require('./insert');
var concat = require('concat-stream');
insert('aggregate', [{
insert('sort', [{
name:'Squirtle', type:'water'
}, {
name:'Starmie' , type:'water'

22
test/test-iscapped.js

@ -0,0 +1,22 @@
var test = require('./tape');
var mongojs = require('../index');
var db = mongojs('test', ['a', 'mycappedcol']);
test('isCapped', function(t) {
db.mycappedcol.drop(function(err) {
db.createCollection('mycappedcol', {capped: true, size: 1024}, function(err) {
t.notOk(err);
db.mycappedcol.isCapped(function(err, ic) {
t.notOk(err);
t.ok(ic);
db.a.insert({}, function(err) {
t.notOk(err);
db.a.isCapped(function(err, ic2) {
t.notOk(ic2);
db.mycappedcol.drop(t.end.bind(t));
});
});
});
});
});
});

45
test/test-pass-driver-db.js

@ -1,36 +1,31 @@
var test = require('./tape');
var mongodb = require('mongodb');
var mongojs = require('../');
var each = require('each-series');
test('receive a driver db or mongojs instance', function(t) {
mongodb.Db.connect('mongodb://localhost/test', function(err, thedb) {
var doTests = function(db, i, callback) {
var afterFind = function() {
db.a.remove(function(err) {
t.ok(!err);
t.equal(db.toString(), 'test');
callback();
});
};
var db = mongojs(mongojs('test', []), ['a']);
var afterFind = function() {
db.a.remove(function(err) {
t.ok(!err);
t.equal(db.toString(), 'test');
t.end();
});
};
var afterInsert = function(err) {
t.ok(!err);
var afterInsert = function(err) {
t.ok(!err);
db.a.findOne(function(err, data) {
t.equal(data.name, 'Pidgey');
afterFind();
});
};
db.a.findOne(function(err, data) {
t.equal(data.name, 'Pidgey');
afterFind();
});
};
var afterRemove = function(err) {
t.ok(!err);
db.a.insert({name: 'Pidgey'}, afterInsert);
};
db.a.remove(afterRemove);
};
var afterRemove = function(err) {
t.ok(!err);
db.a.insert({name: 'Pidgey'}, afterInsert);
};
db.a.remove(afterRemove);
each([mongojs(thedb, ['a']), mongojs(mongojs('test', []), ['a'])], doTests, t.end.bind(t));
});
});

1
test/test-update-and-callback.js

@ -7,7 +7,6 @@ insert('update and callback', [{
db.a.update({hello:'world'}, {$set:{hello:'verden'}}, function(err, lastErrorObject) {
t.ok(!sync);
t.ok(!err);
t.equal(lastErrorObject.updatedExisting, true);
t.equal(lastErrorObject.n, 1);
done();

1
test/test-update-multi.js

@ -7,7 +7,6 @@ insert('update multi', [{
}], function(db, t, done) {
db.a.update({}, {$set:{updated:true}}, {multi:true}, function(err, lastErrorObject) {
t.ok(!err);
t.equal(lastErrorObject.updatedExisting, true);
t.equal(lastErrorObject.n, 2);
db.a.find(function(err, docs) {

1
test/test-update.js

@ -5,7 +5,6 @@ insert('update', [{
}], function(db, t, done) {
db.a.update({hello:'world'}, {$set:{hello:'verden'}}, function(err, lastErrorObject) {
t.ok(!err);
t.equal(lastErrorObject.updatedExisting, true);
t.equal(lastErrorObject.n, 1);
db.a.findOne(function(err, doc) {

Loading…
Cancel
Save