You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

303 lines
6.8 KiB

#!/usr/bin/env node
'use strict';
var _ = require('lodash');
var $ = require('preconditions').singleton();
var async = require('async');
var log = require('npmlog');
log.debug = log.verbose;
log.disableColor();
var mongodb = require('mongodb');
var moment = require('moment');
var config = require('../config');
var storage = require('./storage');
var INITIAL_DATE = '2015-01-01';
function Stats(opts) {
opts = opts || {};
this.network = opts.network || 'livenet';
this.from = moment(opts.from || INITIAL_DATE);
this.to = moment(opts.to);
this.fromTs = this.from.startOf('day').valueOf();
this.toTs = this.to.endOf('day').valueOf();
};
Stats.prototype.run = function(cb) {
var self = this;
var uri = config.storageOpts.mongoDb.uri;
mongodb.MongoClient.connect(uri, function(err, db) {
if (err) {
log.error('Unable to connect to the mongoDB', err);
return cb(err, null);
}
log.info('Connection established to ' + uri);
self.db = db;
self._getStats(function(err, stats) {
if (err) return cb(err);
return cb(null, stats);
});
});
};
Stats.prototype._getStats = function(cb) {
var self = this;
var result = {};
async.parallel([
function(next) {
self._getNewWallets(next);
},
function(next) {
self._getTxProposals(next);
},
], function(err, results) {
if (err) return cb(err);
result.newWallets = results[0];
result.txProposals = results[1];
return cb(null, result);
});
};
Stats.prototype._getNewWallets = function(cb) {
var self = this;
function getLastDate(cb) {
self.db.collection('stats_wallets')
.find({})
.sort({
'_id.day': -1
})
.limit(1)
.toArray(function(err, lastRecord) {
if (_.isEmpty(lastRecord)) return cb(null, moment(INITIAL_DATE));
return cb(null, moment(lastRecord[0]._id.day));
});
};
function updateStats(from, cb) {
var to = moment().subtract(1, 'day').endOf('day');
var map = function() {
var day = new Date(this.createdOn * 1000);
day.setHours(0);
day.setMinutes(0);
day.setSeconds(0);
var key = {
day: +day,
network: this.network,
};
var value = {
count: 1
};
emit(key, value);
};
var reduce = function(k, v) {
var count = 0;
for (var i = 0; i < v.length; i++) {
count += v[i].count;
}
return {
count: count,
};
};
var opts = {
query: {
createdOn: {
$gt: from.unix(),
$lte: to.unix(),
},
},
out: {
merge: 'stats_wallets',
}
};
self.db.collection(storage.collections.WALLETS)
.mapReduce(map, reduce, opts, function(err, collection, stats) {
return cb(err);
});
};
function queryStats(cb) {
self.db.collection('stats_wallets')
.find({
'_id.network': self.network,
'_id.day': {
$gte: self.fromTs,
$lte: self.toTs,
},
})
.sort({
'_id.day': 1
})
.toArray(function(err, results) {
if (err) return cb(err);
var stats = {};
stats.byDay = _.map(results, function(record) {
var day = moment(record._id.day).format('YYYYMMDD');
return {
day: day,
count: record.value.count,
};
});
return cb(null, stats);
});
};
async.series([
function(next) {
getLastDate(function(err, lastDate) {
if (err) return next(err);
lastDate = lastDate.startOf('day');
var yesterday = moment().subtract(1, 'day').startOf('day');
if (lastDate.isBefore(yesterday)) {
// Needs update
return updateStats(lastDate, next);
}
next();
});
},
function(next) {
queryStats(next);
},
],
function(err, res) {
if (err) {
log.error(err);
}
return cb(err, res[1]);
});
};
Stats.prototype._getTxProposals = function(cb) {
var self = this;
function getLastDate(cb) {
self.db.collection('stats_txps')
.find({})
.sort({
'_id.day': -1
})
.limit(1)
.toArray(function(err, lastRecord) {
if (_.isEmpty(lastRecord)) return cb(null, moment(INITIAL_DATE));
return cb(null, moment(lastRecord[0]._id.day));
});
};
function updateStats(from, cb) {
var to = moment().subtract(1, 'day').endOf('day');
var map = function() {
var day = new Date(this.broadcastedOn * 1000);
day.setHours(0);
day.setMinutes(0);
day.setSeconds(0);
var key = {
day: +day,
network: this.network,
};
var value = {
count: 1,
amount: this.amount
};
emit(key, value);
};
var reduce = function(k, v) {
var count = 0,
amount = 0;
for (var i = 0; i < v.length; i++) {
count += v[i].count;
amount += v[i].amount;
}
return {
count: count,
amount: amount,
};
};
var opts = {
query: {
status: 'broadcasted',
broadcastedOn: {
$gt: from.unix(),
$lte: to.unix(),
},
},
out: {
merge: 'stats_txps',
}
};
self.db.collection(storage.collections.TXS)
.mapReduce(map, reduce, opts, function(err, collection, stats) {
return cb(err);
});
};
function queryStats(cb) {
self.db.collection('stats_txps')
.find({
'_id.network': self.network,
'_id.day': {
$gte: self.fromTs,
$lte: self.toTs,
},
})
.sort({
'_id.day': 1
})
.toArray(function(err, results) {
if (err) return cb(err);
var stats = {
nbByDay: [],
amountByDay: []
};
_.each(results, function(record) {
var day = moment(record._id.day).format('YYYYMMDD');
stats.nbByDay.push({
day: day,
count: record.value.count,
});
stats.amountByDay.push({
day: day,
amount: record.value.amount,
});
});
return cb(null, stats);
});
};
async.series([
function(next) {
getLastDate(function(err, lastDate) {
if (err) return next(err);
lastDate = lastDate.startOf('day');
var yesterday = moment().subtract(1, 'day').startOf('day');
if (lastDate.isBefore(yesterday)) {
// Needs update
return updateStats(lastDate, next);
}
next();
});
},
function(next) {
queryStats(next);
},
],
function(err, res) {
if (err) {
log.error(err);
}
return cb(err, res[1]);
});
};
module.exports = Stats;