From bb6bdcf4d80479163bafb5623f3aee7dccacf1d2 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Wed, 4 Mar 2015 19:34:30 +0000 Subject: [PATCH 1/2] cleanOldWorkers --- lib/queue.js | 52 +++++++++++++++++++++++++++++++++++--- test/core/queue.js | 63 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 111 insertions(+), 4 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 5039758c..52419bf8 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -317,6 +317,46 @@ queue.prototype.allWorkingOn = function(callback){ }); }; +queue.prototype.forceCleanWorker = function(workerName, callback){ + var self = this; + self.workers(function(err, workers){ + var queues = workers[workerName]; + var errorPayload; + if(err){ callback(err); } + else if(!queues){ callback(new Error('worker not round')); } + else{ + self.workingOn(workerName, queues, function(err, workingOn){ + if(err){ callback(err); } + else if(workingOn){ + workingOn = JSON.parse(workingOn); + errorPayload = { + worker: workerName, + queue: workingOn.queue, + payload: workingOn.payload, + exception: 'Worker Timeout (killed manually)', + error: 'Worker Timeout (killed manually)', + backtrace: null, + failed_at: (new Date()).toString() + }; + self.connection.redis.incr(self.connection.key('stat', 'failed')); + self.connection.redis.incr(self.connection.key('stat', 'failed', workerName)); + self.connection.redis.rpush(self.connection.key('failed'), JSON.stringify(errorPayload)); + } + + self.connection.redis.del([ + self.connection.key('stat', 'failed', workerName), + self.connection.key('stat', 'processed', workerName), + self.connection.key('worker', workerName), + self.connection.redis.srem(self.connection.key('workers'), workerName + ':' + queues) + ], function(err, data){ + callback(err, errorPayload); + }); + + }); + } + }); +}; + queue.prototype.cleanOldWorkers = function(age, queue, callback){ // note: this method will remove the data created by a "stuck" worker and move the payload to the error queue // however, it will not actually remove any processes which may be running. A job *may* be running that you have removed @@ -329,10 +369,16 @@ queue.prototype.cleanOldWorkers = function(age, queue, callback){ callback(null, results); }else{ var started = 0; - for(var worker in data){ + for(var workerName in data){ started++; - if(Date.now() - Date.parse(data.run_at) > age){ - // ** HERE ** + if(Date.now() - Date.parse(data[workerName].run_at) > age){ + self.forceCleanWorker(workerName, function(error, errorPayload){ + if(errorPayload && errorPayload.worker ){ results[errorPayload.worker] = errorPayload; } + started--; + if(started === 0 && typeof callback === 'function'){ + callback(null, results); + } + }); }else{ process.nextTick(function(){ started--; diff --git a/test/core/queue.js b/test/core/queue.js index 7e1f8abf..b1ddd76d 100644 --- a/test/core/queue.js +++ b/test/core/queue.js @@ -297,7 +297,7 @@ describe('queue', function(){ describe('worker status', function(){ var workerA; var workerB; - var timeout = 100; + var timeout = 500; var jobs = { "slowJob": { @@ -374,6 +374,67 @@ describe('queue', function(){ queue.enqueue(specHelper.queue, "slowJob"); workerA.start(); }); + + it('can remove stuck workers', function(done){ + var age = 1; + var listener = workerA.on('job', function(q, job, failure){ + workerA.removeAllListeners('job'); + + queue.allWorkingOn(function(err, data){ + var paylaod = data['workerA'].payload; + paylaod.queue.should.equal('test_queue'); + paylaod.class.should.equal('slowJob'); + + queue.cleanOldWorkers(age, specHelper.queue, function(err, data){ + should.not.exist(err); + Object.keys(data).length.should.equal(1); + data.workerA.queue.should.equal('test_queue'); + data.workerA.worker.should.equal('workerA'); + data.workerA.payload.class.should.equal('slowJob'); + + specHelper.redis.rpop(specHelper.namespace + ":" + "failed", function(err, data){ + data = JSON.parse(data); + data.queue.should.equal(specHelper.queue); + data.exception.should.equal('Worker Timeout (killed manually)'); + data.error.should.equal('Worker Timeout (killed manually)'); + data.payload.class.should.equal('slowJob'); + + queue.allWorkingOn(function(err, data){ + Object.keys(data).length.should.equal(1); + data.workerB.should.equal('started'); + done(); + }); + }); + }); + }); + }); + + queue.enqueue(specHelper.queue, "slowJob"); + workerA.start(); + }); + + it('will not remove stuck jobs within the timelimit', function(done){ + var age = 999; + var listener = workerA.on('job', function(q, job, failure){ + workerA.removeAllListeners('job'); + + queue.cleanOldWorkers(age, specHelper.queue, function(err, data){ + should.not.exist(err); + Object.keys(data).length.should.equal(0); + queue.allWorkingOn(function(err, data){ + var paylaod = data['workerA'].payload; + paylaod.queue.should.equal('test_queue'); + paylaod.class.should.equal('slowJob'); + + done(); + }); + }); + }); + + queue.enqueue(specHelper.queue, "slowJob"); + workerA.start(); + }); + }); }); From 884a23c32c08f73e1262d585641de362bfced807 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Wed, 4 Mar 2015 19:38:15 +0000 Subject: [PATCH 2/2] cleanOldWorkers --- lib/queue.js | 2 +- test/core/queue.js | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 52419bf8..1b9f6a05 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -357,7 +357,7 @@ queue.prototype.forceCleanWorker = function(workerName, callback){ }); }; -queue.prototype.cleanOldWorkers = function(age, queue, callback){ +queue.prototype.cleanOldWorkers = function(age, callback){ // note: this method will remove the data created by a "stuck" worker and move the payload to the error queue // however, it will not actually remove any processes which may be running. A job *may* be running that you have removed var self = this; diff --git a/test/core/queue.js b/test/core/queue.js index b1ddd76d..96b8c202 100644 --- a/test/core/queue.js +++ b/test/core/queue.js @@ -385,7 +385,7 @@ describe('queue', function(){ paylaod.queue.should.equal('test_queue'); paylaod.class.should.equal('slowJob'); - queue.cleanOldWorkers(age, specHelper.queue, function(err, data){ + queue.cleanOldWorkers(age, function(err, data){ should.not.exist(err); Object.keys(data).length.should.equal(1); data.workerA.queue.should.equal('test_queue'); @@ -418,14 +418,14 @@ describe('queue', function(){ var listener = workerA.on('job', function(q, job, failure){ workerA.removeAllListeners('job'); - queue.cleanOldWorkers(age, specHelper.queue, function(err, data){ + queue.cleanOldWorkers(age, function(err, data){ should.not.exist(err); Object.keys(data).length.should.equal(0); queue.allWorkingOn(function(err, data){ var paylaod = data['workerA'].payload; paylaod.queue.should.equal('test_queue'); paylaod.class.should.equal('slowJob'); - + done(); }); });