From 57ae6e7944af53070c058a2d3616a44a0645c0b3 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Thu, 25 Feb 2016 12:18:42 -0800 Subject: [PATCH] use proper delayed set queue name --- lib/queue.js | 4 ++-- lib/scheduler.js | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 041eb40b..0716e1c7 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -11,7 +11,7 @@ var queue = function(options, jobs){ self.jobs = jobs; self.connection = new connection(options.connection); - + self.connection.on('error', function(error){ self.emit('error', error); }); @@ -80,7 +80,7 @@ queue.prototype.enqueueAt = function(timestamp, q, func, args, callback){ // enqueue the encoded job into a list per timestmp to be popped and workered later self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){ // save the job + args into a set so that it can be checked by plugins - self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){ + self.connection.redis.sadd(self.connection.key("timestamps:" + item), "delayed:" + rTimestamp, function(){ // and the timestamp in question to a zset to the scheduler will know which timestamps have data to work self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){ if(typeof callback === 'function'){ callback(); } diff --git a/lib/scheduler.js b/lib/scheduler.js index 83ccf48b..7c37b6bc 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -1,4 +1,4 @@ -// To read notes about the master locking scheme, check out: +// To read notes about the master locking scheme, check out: // https://github.com/resque/resque-scheduler/blob/master/lib/resque/scheduler/locking.rb var EventEmitter = require('events').EventEmitter; @@ -11,7 +11,7 @@ var scheduler = function(options, jobs){ var self = this; if(!jobs){ jobs = {}; } var defaults = self.defaults(); - + for(var i in defaults){ if(options[i] === null || options[i] === undefined){ options[i] = defaults[i]; @@ -22,7 +22,7 @@ var scheduler = function(options, jobs){ self.name = self.options.name; self.master = false; self.running = false; - self.processing = false; + self.processing = false; self.queue = new queue({connection: options.connection}, jobs); @@ -132,7 +132,7 @@ scheduler.prototype.pollAgainLater = function(){ scheduler.prototype.tryForMaster = function(callback) { var self = this; - + if(!self.connection || !self.connection.redis) { return callback(); } @@ -140,16 +140,16 @@ scheduler.prototype.tryForMaster = function(callback) { var masterKey = self.connection.key('resque_scheduler_master_lock'); self.connection.redis.setnx(masterKey, self.options.name, function(error, locked){ if(error){ return callback(error); } - else if(locked === true || locked === 1){ + else if(locked === true || locked === 1){ self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){ - return callback(error, true); + return callback(error, true); }); }else{ self.connection.redis.get(masterKey, function(error, value){ if(error){ return callback(error); } else if(value === self.options.name){ self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){ - return callback(error, true); + return callback(error, true); }); }else{ return callback(null, false); @@ -166,11 +166,11 @@ scheduler.prototype.releaseMasterLock = function(callback){ self.tryForMaster(function(error, isMaster){ if(error){ return callback(error); } else if(!isMaster){ return callback(null, false); } - else{ + else{ self.connection.redis.del(masterKey, function(error, delted){ self.master = false; callback(error, (delted === 1)); - }); + }); } }); }else{ @@ -210,7 +210,7 @@ scheduler.prototype.nextItemForTimestamp = function(timestamp, callback) { if(error){ callback(error); }else{ - self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(error){ + self.connection.redis.srem(self.connection.key("timestamps:" + job), ('delayed:' + timestamp), function(error){ self.cleanupTimestamp(timestamp, function(){ if (error) { callback(error);