diff --git a/lib/queue.js b/lib/queue.js index 86081c2a..2f5ab509 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -317,6 +317,81 @@ queue.prototype.allWorkingOn = function(callback){ }); }; +queue.prototype.forceCleanWorker = function(workerName, callback){ + var self = this; + self.workers(function(err, workers){ + var queues = workers[workerName]; + var errorPayload; + if(err){ callback(err); } + else if(!queues){ callback(new Error('worker not round')); } + else{ + self.workingOn(workerName, queues, function(err, workingOn){ + if(err){ callback(err); } + else if(workingOn){ + workingOn = JSON.parse(workingOn); + errorPayload = { + worker: workerName, + queue: workingOn.queue, + payload: workingOn.payload, + exception: 'Worker Timeout (killed manually)', + error: 'Worker Timeout (killed manually)', + backtrace: null, + failed_at: (new Date()).toString() + }; + self.connection.redis.incr(self.connection.key('stat', 'failed')); + self.connection.redis.incr(self.connection.key('stat', 'failed', workerName)); + self.connection.redis.rpush(self.connection.key('failed'), JSON.stringify(errorPayload)); + } + + self.connection.redis.del([ + self.connection.key('stat', 'failed', workerName), + self.connection.key('stat', 'processed', workerName), + self.connection.key('worker', workerName), + self.connection.redis.srem(self.connection.key('workers'), workerName + ':' + queues) + ], function(err, data){ + callback(err, errorPayload); + }); + + }); + } + }); +}; + +queue.prototype.cleanOldWorkers = function(age, callback){ + // note: this method will remove the data created by a "stuck" worker and move the payload to the error queue + // however, it will not actually remove any processes which may be running. A job *may* be running that you have removed + var self = this; + var results = {}; + self.allWorkingOn(function(err, data){ + if(err && typeof callback === 'function'){ + callback(err); + }else if((!data || hashLength(data) && typeof callback === 'function' ) === 0){ + callback(null, results); + }else{ + var started = 0; + for(var workerName in data){ + started++; + if(Date.now() - Date.parse(data[workerName].run_at) > age){ + self.forceCleanWorker(workerName, function(error, errorPayload){ + if(errorPayload && errorPayload.worker ){ results[errorPayload.worker] = errorPayload; } + started--; + if(started === 0 && typeof callback === 'function'){ + callback(null, results); + } + }); + }else{ + process.nextTick(function(){ + started--; + if(started === 0 && typeof callback === 'function'){ + callback(null, results); + } + }); + } + } + } + }); +}; + queue.prototype.failedCount = function(callback){ var self = this; self.connection.redis.llen(self.connection.key('failed'), function(err, length){ diff --git a/test/core/queue.js b/test/core/queue.js index cb1ae896..9c3eb7b1 100644 --- a/test/core/queue.js +++ b/test/core/queue.js @@ -402,7 +402,7 @@ describe('queue', function(){ describe('worker status', function(){ var workerA; var workerB; - var timeout = 100; + var timeout = 500; var jobs = { "slowJob": { @@ -479,6 +479,67 @@ describe('queue', function(){ queue.enqueue(specHelper.queue, "slowJob"); workerA.start(); }); + + it('can remove stuck workers', function(done){ + var age = 1; + var listener = workerA.on('job', function(q, job, failure){ + workerA.removeAllListeners('job'); + + queue.allWorkingOn(function(err, data){ + var paylaod = data['workerA'].payload; + paylaod.queue.should.equal('test_queue'); + paylaod.class.should.equal('slowJob'); + + queue.cleanOldWorkers(age, function(err, data){ + should.not.exist(err); + Object.keys(data).length.should.equal(1); + data.workerA.queue.should.equal('test_queue'); + data.workerA.worker.should.equal('workerA'); + data.workerA.payload.class.should.equal('slowJob'); + + specHelper.redis.rpop(specHelper.namespace + ":" + "failed", function(err, data){ + data = JSON.parse(data); + data.queue.should.equal(specHelper.queue); + data.exception.should.equal('Worker Timeout (killed manually)'); + data.error.should.equal('Worker Timeout (killed manually)'); + data.payload.class.should.equal('slowJob'); + + queue.allWorkingOn(function(err, data){ + Object.keys(data).length.should.equal(1); + data.workerB.should.equal('started'); + done(); + }); + }); + }); + }); + }); + + queue.enqueue(specHelper.queue, "slowJob"); + workerA.start(); + }); + + it('will not remove stuck jobs within the timelimit', function(done){ + var age = 999; + var listener = workerA.on('job', function(q, job, failure){ + workerA.removeAllListeners('job'); + + queue.cleanOldWorkers(age, function(err, data){ + should.not.exist(err); + Object.keys(data).length.should.equal(0); + queue.allWorkingOn(function(err, data){ + var paylaod = data['workerA'].payload; + paylaod.queue.should.equal('test_queue'); + paylaod.class.should.equal('slowJob'); + + done(); + }); + }); + }); + + queue.enqueue(specHelper.queue, "slowJob"); + workerA.start(); + }); + }); });