lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-3.0.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-3.1.0
- old
+ new
@@ -103,16 +103,11 @@
end
# Returns the next delayed queue timestamp
# (don't call directly)
def next_delayed_timestamp(at_time = nil)
- items = redis.zrangebyscore(
- :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i,
- limit: [0, 1]
- )
- timestamp = items.nil? ? nil : Array(items).first
- timestamp.to_i unless timestamp.nil?
+ search_first_delayed_timestamp_in_range(nil, at_time || Time.now)
end
# Returns the next item to be processed for a given timestamp, nil if
# done. (don't call directly)
# +timestamp+ can either be in seconds or a datetime
@@ -143,21 +138,11 @@
end
# Given an encoded item, remove it from the delayed_queue
def remove_delayed(klass, *args)
search = encode(job_to_hash(klass, args))
- timestamps = redis.smembers("timestamps:#{search}")
-
- replies = redis.pipelined do
- timestamps.each do |key|
- redis.lrem(key, 0, search)
- redis.srem("timestamps:#{search}", key)
- end
- end
-
- return 0 if replies.nil? || replies.empty?
- replies.each_slice(2).map(&:first).inject(:+)
+ remove_delayed_job(search)
end
# Given an encoded item, enqueue it now
def enqueue_delayed(klass, *args)
hash = job_to_hash(klass, args)
@@ -168,32 +153,34 @@
# Given a block, remove jobs that return true from a block
#
# This allows for removal of delayed jobs that have arguments matching
# certain criteria
- def remove_delayed_selection
+ def remove_delayed_selection(klass = nil)
fail ArgumentError, 'Please supply a block' unless block_given?
destroyed = 0
- # There is no way to search Redis list entries for a partial match,
- # so we query for all delayed job tasks and do our matching after
- # decoding the payload data
- jobs = Resque.redis.keys('delayed:*')
- jobs.each do |job|
+ start = nil
+ while start = search_first_delayed_timestamp_in_range(start, nil)
+ job = "delayed:#{start}"
+ start += 1
index = Resque.redis.llen(job) - 1
while index >= 0
payload = Resque.redis.lindex(job, index)
decoded_payload = decode(payload)
- if yield(decoded_payload['args'])
- removed = redis.lrem job, 0, payload
+ job_class = decoded_payload['class']
+ relevant_class = (klass.nil? || klass.to_s == job_class)
+ if relevant_class && yield(decoded_payload['args'])
+ removed = remove_delayed_job(payload)
destroyed += removed
index -= removed
else
index -= 1
end
end
end
+
destroyed
end
# Given a timestamp and job (klass + args) it removes all instances and
# returns the count of jobs removed.
@@ -252,10 +239,24 @@
def job_to_hash_with_queue(queue, klass, args)
{ class: klass.to_s, args: args, queue: queue }
end
+ def remove_delayed_job(encoded_job)
+ timestamps = redis.smembers("timestamps:#{encoded_job}")
+
+ replies = redis.pipelined do
+ timestamps.each do |key|
+ redis.lrem(key, 0, encoded_job)
+ redis.srem("timestamps:#{encoded_job}", key)
+ end
+ end
+
+ return 0 if replies.nil? || replies.empty?
+ replies.each_slice(2).map(&:first).inject(:+)
+ end
+
def clean_up_timestamp(key, timestamp)
# If the list is empty, remove it.
# Use a watch here to ensure nobody adds jobs to this delayed
# queue while we're removing it.
@@ -266,9 +267,21 @@
redis.zrem :delayed_queue_schedule, timestamp.to_i
end
else
redis.unwatch
end
+ end
+
+ def search_first_delayed_timestamp_in_range(start_at, stop_at)
+ start_at = start_at.nil? ? '-inf' : start_at.to_i
+ stop_at = stop_at.nil? ? '+inf' : stop_at.to_i
+
+ items = redis.zrangebyscore(
+ :delayed_queue_schedule, start_at, stop_at,
+ limit: [0, 1]
+ )
+ timestamp = items.nil? ? nil : Array(items).first
+ timestamp.to_i unless timestamp.nil?
end
def plugin
Resque::Scheduler::Plugin
end