lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.8.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.9.0

- old
+ new

@@ -88,11 +88,11 @@ def delayed_push(timestamp, item) # First add this item to the list for this timestamp redis.rpush("delayed:#{timestamp.to_i}", encode(item)) # Store the timestamps at with this item occurs - redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") + redis.sadd("timestamps:#{encode(item)}", ["delayed:#{timestamp.to_i}"]) # Now, add this timestamp to the zsets. The score and the value are # the same since we'll be querying by timestamp, and we don't have # anything else to store. redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i @@ -138,11 +138,11 @@ # +timestamp+ can either be in seconds or a datetime def next_item_for_timestamp(timestamp) key = "delayed:#{timestamp.to_i}" encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) + redis.srem("timestamps:#{encoded_item}", [key]) item = decode(encoded_item) # If the list is empty, remove it. clean_up_timestamp(key, timestamp) item @@ -255,11 +255,11 @@ return 0 if Resque.inline? key = "delayed:#{timestamp.to_i}" encoded_job = encode(job_to_hash(klass, args)) - redis.srem("timestamps:#{encoded_job}", key) + redis.srem("timestamps:#{encoded_job}", [key]) count = redis.lrem(key, 0, encoded_job) clean_up_timestamp(key, timestamp) count end @@ -295,10 +295,26 @@ def get_last_enqueued_at(job_name) redis.hget('delayed:last_enqueued_at', job_name) end + def clean_up_timestamp(key, timestamp) + # Use a watch here to ensure nobody adds jobs to this delayed + # queue while we're removing it. + redis.watch(key) do + if redis.llen(key).to_i == 0 + # If the list is empty, remove it. + redis.multi do |transaction| + transaction.del(key) + transaction.zrem(:delayed_queue_schedule, timestamp.to_i) + end + else + redis.redis.unwatch + end + end + end + private def job_to_hash(klass, args) { class: klass.to_s, args: args, queue: queue_from_class(klass) } end @@ -315,34 +331,18 @@ timestamps = redis.smembers("timestamps:#{encoded_job}") replies = redis.pipelined do |pipeline| timestamps.each do |key| pipeline.lrem(key, 0, encoded_job) - pipeline.srem("timestamps:#{encoded_job}", key) + pipeline.srem("timestamps:#{encoded_job}", [key]) end end # timestamp key is not removed from the schedule, this is done later # by the scheduler loop return 0 if replies.nil? || replies.empty? replies.each_slice(2).map(&:first).inject(:+) - end - - def clean_up_timestamp(key, timestamp) - # Use a watch here to ensure nobody adds jobs to this delayed - # queue while we're removing it. - redis.watch(key) do - if redis.llen(key).to_i == 0 - # If the list is empty, remove it. - redis.multi do |transaction| - transaction.del(key) - transaction.zrem(:delayed_queue_schedule, timestamp.to_i) - end - else - redis.redis.unwatch - end - end end def search_first_delayed_timestamp_in_range(start_at, stop_at) start_at = start_at.nil? ? '-inf' : start_at.to_i stop_at = stop_at.nil? ? '+inf' : stop_at.to_i