lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.2.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.2.1
- old
+ new
@@ -154,11 +154,11 @@
# Given a block, remove jobs that return true from a block
#
# This allows for removal of delayed jobs that have arguments matching
# certain criteria
def remove_delayed_selection(klass = nil)
- fail ArgumentError, 'Please supply a block' unless block_given?
+ raise ArgumentError, 'Please supply a block' unless block_given?
found_jobs = find_delayed_selection(klass) { |args| yield(args) }
found_jobs.reduce(0) do |sum, encoded_job|
sum + remove_delayed_job(encoded_job)
end
@@ -167,11 +167,11 @@
# Given a block, enqueue jobs now that return true from a block
#
# This allows for enqueuing of delayed jobs that have arguments matching
# certain criteria
def enqueue_delayed_selection(klass = nil)
- fail ArgumentError, 'Please supply a block' unless block_given?
+ raise ArgumentError, 'Please supply a block' unless block_given?
found_jobs = find_delayed_selection(klass) { |args| yield(args) }
found_jobs.reduce(0) do |sum, encoded_job|
decoded_job = decode(encoded_job)
klass = Util.constantize(decoded_job['class'])
@@ -182,27 +182,27 @@
# Given a block, find jobs that return true from a block
#
# This allows for finding of delayed jobs that have arguments matching
# certain criteria
def find_delayed_selection(klass = nil, &block)
- fail ArgumentError, 'Please supply a block' unless block_given?
+ raise ArgumentError, 'Please supply a block' unless block_given?
- found_jobs = []
- start = nil
- while start = search_first_delayed_timestamp_in_range(start, nil)
- job = "delayed:#{start}"
- start += 1
- index = Resque.redis.llen(job) - 1
- while index >= 0
- payload = Resque.redis.lindex(job, index)
- decoded_payload = decode(payload)
- if payload_matches_selection?(decoded_payload, klass, &block)
- found_jobs.push(payload)
+ timestamps = redis.zrange(:delayed_queue_schedule, 0, -1)
+
+ # Beyond 100 there's almost no improvement in speed
+ found = timestamps.each_slice(100).map do |ts_group|
+ jobs = redis.pipelined do |r|
+ ts_group.each do |ts|
+ r.lrange("delayed:#{ts}", 0, -1)
end
- index -= 1
end
+
+ jobs.flatten.select do |payload|
+ payload_matches_selection?(decode(payload), klass, &block)
+ end
end
- found_jobs
+
+ found.flatten
end
# Given a timestamp and job (klass + args) it removes all instances and
# returns the count of jobs removed.
#