Skip to content

Commit

Permalink
Merge pull request #121 from taskrabbit/delayed-set
Browse files Browse the repository at this point in the history
use proper delayed set queue name
  • Loading branch information
evantahler committed Feb 25, 2016
2 parents 5ea9315 + 57ae6e7 commit 85b00f7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var queue = function(options, jobs){
self.jobs = jobs;

self.connection = new connection(options.connection);

self.connection.on('error', function(error){
self.emit('error', error);
});
Expand Down Expand Up @@ -80,7 +80,7 @@ queue.prototype.enqueueAt = function(timestamp, q, func, args, callback){
// enqueue the encoded job into a list per timestmp to be popped and workered later
self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){
// save the job + args into a set so that it can be checked by plugins
self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){
self.connection.redis.sadd(self.connection.key("timestamps:" + item), "delayed:" + rTimestamp, function(){
// and the timestamp in question to a zset to the scheduler will know which timestamps have data to work
self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){
if(typeof callback === 'function'){ callback(); }
Expand Down
20 changes: 10 additions & 10 deletions lib/scheduler.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// To read notes about the master locking scheme, check out:
// To read notes about the master locking scheme, check out:
// https://github.com/resque/resque-scheduler/blob/master/lib/resque/scheduler/locking.rb

var EventEmitter = require('events').EventEmitter;
Expand All @@ -11,7 +11,7 @@ var scheduler = function(options, jobs){
var self = this;
if(!jobs){ jobs = {}; }
var defaults = self.defaults();

for(var i in defaults){
if(options[i] === null || options[i] === undefined){
options[i] = defaults[i];
Expand All @@ -22,7 +22,7 @@ var scheduler = function(options, jobs){
self.name = self.options.name;
self.master = false;
self.running = false;
self.processing = false;
self.processing = false;

self.queue = new queue({connection: options.connection}, jobs);

Expand Down Expand Up @@ -132,24 +132,24 @@ scheduler.prototype.pollAgainLater = function(){

scheduler.prototype.tryForMaster = function(callback) {
var self = this;

if(!self.connection || !self.connection.redis) {
return callback();
}

var masterKey = self.connection.key('resque_scheduler_master_lock');
self.connection.redis.setnx(masterKey, self.options.name, function(error, locked){
if(error){ return callback(error); }
else if(locked === true || locked === 1){
else if(locked === true || locked === 1){
self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){
return callback(error, true);
return callback(error, true);
});
}else{
self.connection.redis.get(masterKey, function(error, value){
if(error){ return callback(error); }
else if(value === self.options.name){
self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){
return callback(error, true);
return callback(error, true);
});
}else{
return callback(null, false);
Expand All @@ -166,11 +166,11 @@ scheduler.prototype.releaseMasterLock = function(callback){
self.tryForMaster(function(error, isMaster){
if(error){ return callback(error); }
else if(!isMaster){ return callback(null, false); }
else{
else{
self.connection.redis.del(masterKey, function(error, delted){
self.master = false;
callback(error, (delted === 1));
});
});
}
});
}else{
Expand Down Expand Up @@ -210,7 +210,7 @@ scheduler.prototype.nextItemForTimestamp = function(timestamp, callback) {
if(error){
callback(error);
}else{
self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(error){
self.connection.redis.srem(self.connection.key("timestamps:" + job), ('delayed:' + timestamp), function(error){
self.cleanupTimestamp(timestamp, function(){
if (error) {
callback(error);
Expand Down

0 comments on commit 85b00f7

Please sign in to comment.