Skip to content

Commit

Permalink
Merge pull request #107 from faceair/master
Browse files Browse the repository at this point in the history
improve default plugins
  • Loading branch information
evantahler committed Jan 7, 2016
2 parents a11e6c1 + 0cb7eba commit 873c122
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
73 changes: 59 additions & 14 deletions lib/plugins/simpleRetry.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// If the job fails, sleep, and re-enqueue it.
// You probably never want to use this in production
var crypto = require('crypto');

var simpleRetry = function(worker, func, queue, job, args, options){
var self = this;
Expand All @@ -10,37 +11,81 @@ var simpleRetry = function(worker, func, queue, job, args, options){
self.job = job;
self.args = args;
self.options = options;
if (! self.options.retryInterval) {
self.options.retryInterval = [60, 300, 600, 1800, 3600]
}

if(self.worker.queueObject){
self.queueObject = self.worker.queueObject;
}else{
self.queueObject = self.worker;
}

self.sleep = 1000;
if(self.options.sleep){ self.sleep = self.options.sleep; }
if (self.args) {
jobHash = crypto.createHash('md5').update(JSON.stringify(self.args)).digest('hex');
self.retryKey = self.queueObject.connection.key('retrytimes', jobHash);
}
};

////////////////////
// PLUGIN METHODS //
////////////////////

simpleRetry.prototype.updateRetryTimes = function(callback) {
var self = this;

self.queueObject.connection.redis.incr(self.retryKey, (function(err, result) {
if (err) { return callback(err); }

self.queueObject.connection.redis.expire(self.retryKey, self.options.retryInterval[self.options.retryInterval.length - 1] * 2, function(err) {
if (err) {
callback(err);
} else if (result > self.options.retryInterval.length) {
err.message = '(Resque Retry Max Attempts Reached) -> ' + err.message;
callback(err);
} else {
callback(null, result);
}
});
}));
};

simpleRetry.prototype.deleteRetryTimes = function(callback) {
this.queueObject.connection.redis.del(this.retryKey, callback);
};

simpleRetry.prototype.after_perform = function(callback){
// console.log("** after_perform")
var self = this;
if (self.worker.error) {
self.updateRetryTimes(function(err, retryTimes) {
if (err) {
if (err.message.indexOf('(Resque Retry Max Attempts Reached) -> ') == 0) {
self.deleteRetryTimes(function(err) {
return callback(err, true);
});
} else {
return callback(err);
}
}

var delay = self.options.retryInterval[retryTimes - 1];
self.queueObject.enqueueIn(delay * 1000, self.queue, self.func, self.args, function(err) {
if (err) { return callback(err); }

if(self.worker.error){
if(self.options.errorCollector){
self.options.errorCollector.push( self.worker.error );
}
self.worker.error = null;
self.queueObject.enqueueIn(self.sleep, self.queue, self.func, self.args, function(err){
callback(err, true);
self.worker.emit('reEnqueue', self.queue, self.job, {
times: retryTimes,
delay: delay,
err: self.worker.error
});
self.worker.error = null;
return callback(err, true);
});
});
}else{
callback(null, true);
}
} else {
self.deleteRetryTimes(function(err) {
return callback(err, true);
});
};
};

exports.simpleRetry = simpleRetry;
exports.simpleRetry = simpleRetry;
24 changes: 11 additions & 13 deletions test/plugins/simpleRetry.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ var should = require('should');

describe('plugins', function(){

var errorCollector = [];
var jobs = {
"brokenJob": {
plugins: [ 'simpleRetry' ],
pluginOptions: { simpleRetry: {
sleep: 100,
errorCollector: errorCollector,
retryInterval: [100]
},},
perform: function(a,b,callback){
callback(new Error("BROKEN"), null);
Expand All @@ -31,25 +29,25 @@ describe('plugins', function(){
});

describe('simpleRetry',function(){

it('bad job should not crash with simpleRetry', function(done){
queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){
queue.length(specHelper.queue, function(err, len){
len.should.equal(1);

var worker = new specHelper.NR.worker({
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
queues: specHelper.queue
}, jobs);

worker.connect(function(){

worker.on('success', function(q, job, result){
errorCollector.length.should.equal(1);
String(errorCollector[0]).should.equal('Error: BROKEN');
worker.end();
done();
specHelper.queue.should.equal(q);
queue.scheduledAt(specHelper.queue, "brokenJob", [1,2], function(err, timestamps){
timestamps.length.should.be.equal(1);
worker.end();
done();
});
});

worker.on('error', function(q, job, error){
Expand All @@ -63,4 +61,4 @@ describe('plugins', function(){
});

});
});
});

0 comments on commit 873c122

Please sign in to comment.