lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-3.0.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-3.1.0

- old
+ new

@@ -103,16 +103,11 @@ end # Returns the next delayed queue timestamp # (don't call directly) def next_delayed_timestamp(at_time = nil) - items = redis.zrangebyscore( - :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, - limit: [0, 1] - ) - timestamp = items.nil? ? nil : Array(items).first - timestamp.to_i unless timestamp.nil? + search_first_delayed_timestamp_in_range(nil, at_time || Time.now) end # Returns the next item to be processed for a given timestamp, nil if # done. (don't call directly) # +timestamp+ can either be in seconds or a datetime @@ -143,21 +138,11 @@ end # Given an encoded item, remove it from the delayed_queue def remove_delayed(klass, *args) search = encode(job_to_hash(klass, args)) - timestamps = redis.smembers("timestamps:#{search}") - - replies = redis.pipelined do - timestamps.each do |key| - redis.lrem(key, 0, search) - redis.srem("timestamps:#{search}", key) - end - end - - return 0 if replies.nil? || replies.empty? - replies.each_slice(2).map(&:first).inject(:+) + remove_delayed_job(search) end # Given an encoded item, enqueue it now def enqueue_delayed(klass, *args) hash = job_to_hash(klass, args) @@ -168,32 +153,34 @@ # Given a block, remove jobs that return true from a block # # This allows for removal of delayed jobs that have arguments matching # certain criteria - def remove_delayed_selection + def remove_delayed_selection(klass = nil) fail ArgumentError, 'Please supply a block' unless block_given? destroyed = 0 - # There is no way to search Redis list entries for a partial match, - # so we query for all delayed job tasks and do our matching after - # decoding the payload data - jobs = Resque.redis.keys('delayed:*') - jobs.each do |job| + start = nil + while start = search_first_delayed_timestamp_in_range(start, nil) + job = "delayed:#{start}" + start += 1 index = Resque.redis.llen(job) - 1 while index >= 0 payload = Resque.redis.lindex(job, index) decoded_payload = decode(payload) - if yield(decoded_payload['args']) - removed = redis.lrem job, 0, payload + job_class = decoded_payload['class'] + relevant_class = (klass.nil? || klass.to_s == job_class) + if relevant_class && yield(decoded_payload['args']) + removed = remove_delayed_job(payload) destroyed += removed index -= removed else index -= 1 end end end + destroyed end # Given a timestamp and job (klass + args) it removes all instances and # returns the count of jobs removed. @@ -252,10 +239,24 @@ def job_to_hash_with_queue(queue, klass, args) { class: klass.to_s, args: args, queue: queue } end + def remove_delayed_job(encoded_job) + timestamps = redis.smembers("timestamps:#{encoded_job}") + + replies = redis.pipelined do + timestamps.each do |key| + redis.lrem(key, 0, encoded_job) + redis.srem("timestamps:#{encoded_job}", key) + end + end + + return 0 if replies.nil? || replies.empty? + replies.each_slice(2).map(&:first).inject(:+) + end + def clean_up_timestamp(key, timestamp) # If the list is empty, remove it. # Use a watch here to ensure nobody adds jobs to this delayed # queue while we're removing it. @@ -266,9 +267,21 @@ redis.zrem :delayed_queue_schedule, timestamp.to_i end else redis.unwatch end + end + + def search_first_delayed_timestamp_in_range(start_at, stop_at) + start_at = start_at.nil? ? '-inf' : start_at.to_i + stop_at = stop_at.nil? ? '+inf' : stop_at.to_i + + items = redis.zrangebyscore( + :delayed_queue_schedule, start_at, stop_at, + limit: [0, 1] + ) + timestamp = items.nil? ? nil : Array(items).first + timestamp.to_i unless timestamp.nil? end def plugin Resque::Scheduler::Plugin end