Browse Source

Merge pull request #242 from isocolsky/multiple_emailservices

Multiple emailservices
activeAddress
Matias Alejo Garcia 10 years ago
parent
commit
3310c637f6
  1. 103
      lib/emailservice.js
  2. 2
      lib/model/email.js
  3. 13
      lib/storage.js
  4. 30
      test/integration/server.js

103
lib/emailservice.js

@ -204,58 +204,69 @@ EmailService.prototype.sendEmail = function(notification, cb) {
self._getRecipientsList(notification, emailType, function(err, recipientsList) { self._getRecipientsList(notification, emailType, function(err, recipientsList) {
if (_.isEmpty(recipientsList)) return cb(); if (_.isEmpty(recipientsList)) return cb();
async.waterfall([ // TODO: Optimize so one process does not have to wait until all others are done
// Instead set a flag somewhere in the db to indicate that this process is free
// to serve another request.
self.lock.runLocked('email-' + notification.id, cb, function(cb) {
self.storage.fetchEmailByNotification(notification.id, function(err, email) {
if (err) return cb(err);
if (email) return cb();
function(next) { async.waterfall([
async.parallel([
function(next) { function(next) {
self._readTemplate(emailType.filename, next); async.parallel([
function(next) {
self._readTemplate(emailType.filename, next);
},
function(next) {
self._getDataForTemplate(notification, next);
},
], function(err, res) {
next(err, res[0], res[1]);
});
}, },
function(next) { function(template, data, next) {
self._getDataForTemplate(notification, next); self._applyTemplate(template, data, next);
}, },
], function(err, res) { function(content, next) {
next(err, res[0], res[1]); async.map(recipientsList, function(recipient, next) {
}); var email = Model.Email.create({
}, walletId: notification.walletId,
function(template, data, next) { copayerId: recipient.copayerId,
self._applyTemplate(template, data, next); from: self.from,
}, to: recipient.emailAddress,
function(content, next) { subject: content.subject,
async.map(recipientsList, function(recipient, next) { body: content.body,
var email = Model.Email.create({ notificationId: notification.id,
walletId: notification.walletId, });
copayerId: recipient.copayerId, self.storage.storeEmail(email, function(err) {
from: self.from, return next(err, email);
to: recipient.emailAddress, });
subject: content.subject, }, next);
body: content.body, },
}); function(emails, next) {
self.storage.storeEmail(email, function(err) { async.each(emails, function(email, next) {
return next(err, email); self._send(email, function(err) {
}); if (err) {
}, next); email.setFail();
}, } else {
function(emails, next) { email.setSent();
async.each(emails, function(email, next) { }
self._send(email, function(err) { self.storage.storeEmail(email, next);
if (err) { });
email.setFail(); }, function(err) {
} else { return next();
email.setSent(); });
} },
self.storage.storeEmail(email, next); ], function(err) {
}); if (err) {
}, function(err) { log.error('An error ocurred generating email notification', err);
return next(); }
return cb(err);
}); });
}, });
], function(err) {
if (err) {
log.error('An error ocurred generating email notification', err);
}
return cb(err);
}); });
}); });
}; };

2
lib/model/email.js

@ -24,6 +24,7 @@ Email.create = function(opts) {
x.status = 'pending'; x.status = 'pending';
x.attempts = 0; x.attempts = 0;
x.lastAttemptOn = null; x.lastAttemptOn = null;
x.notificationId = opts.notificationId;
return x; return x;
}; };
@ -41,6 +42,7 @@ Email.fromObj = function(obj) {
x.status = obj.status; x.status = obj.status;
x.attempts = obj.attempts; x.attempts = obj.attempts;
x.lastAttemptOn = obj.lastAttemptOn; x.lastAttemptOn = obj.lastAttemptOn;
x.notificationId = obj.notificationId;
return x; return x;
}; };

13
lib/storage.js

@ -52,6 +52,9 @@ Storage.prototype._createIndexes = function() {
this.db.collection(collections.ADDRESSES).createIndex({ this.db.collection(collections.ADDRESSES).createIndex({
address: 1, address: 1,
}); });
this.db.collection(collections.EMAIL_QUEUE).createIndex({
notificationId: 1,
});
}; };
Storage.prototype.connect = function(opts, cb) { Storage.prototype.connect = function(opts, cb) {
@ -407,6 +410,16 @@ Storage.prototype.fetchUnsentEmails = function(cb) {
}); });
}; };
Storage.prototype.fetchEmailByNotification = function(notificationId, cb) {
this.db.collection(collections.EMAIL_QUEUE).findOne({
notificationId: notificationId,
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, Model.Email.fromObj(result));
});
};
Storage.prototype._dump = function(cb, fn) { Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log; fn = fn || console.log;

30
test/integration/server.js

@ -496,6 +496,36 @@ describe('Wallet service', function() {
}); });
}); });
}); });
it('should support multiple emailservice instances running concurrently', function(done) {
var emailService2 = new EmailService();
emailService2.start({
lock: emailService.lock, // Use same locker service
messageBroker: server.messageBroker,
storage: storage,
mailer: mailerStub,
emailOpts: {
from: 'bws2@dummy.net',
subjectPrefix: '[test wallet 2]',
},
}, function(err) {
helpers.stubUtxos(server, wallet, 1, function() {
var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 0.8, 'some message', TestData.copayers[0].privKey_1H_0);
server.createTx(txOpts, function(err, tx) {
should.not.exist(err);
setTimeout(function() {
var calls = mailerStub.sendMail.getCalls();
calls.length.should.equal(2);
server.storage.fetchUnsentEmails(function(err, unsent) {
should.not.exist(err);
unsent.should.be.empty;
done();
});
}, 100);
});
});
});
});
}); });

Loading…
Cancel
Save