lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.0.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.1.0
- old
+ new
@@ -156,32 +156,54 @@
# This allows for removal of delayed jobs that have arguments matching
# certain criteria
def remove_delayed_selection(klass = nil)
fail ArgumentError, 'Please supply a block' unless block_given?
- destroyed = 0
+ found_jobs = find_delayed_selection(klass) { |args| yield(args) }
+ found_jobs.reduce(0) do |sum, encoded_job|
+ sum + remove_delayed_job(encoded_job)
+ end
+ end
+
+ # Given a block, enqueue jobs now that return true from a block
+ #
+ # This allows for enqueuing of delayed jobs that have arguments matching
+ # certain criteria
+ def enqueue_delayed_selection(klass = nil)
+ fail ArgumentError, 'Please supply a block' unless block_given?
+
+ found_jobs = find_delayed_selection(klass) { |args| yield(args) }
+ found_jobs.reduce(0) do |sum, encoded_job|
+ decoded_job = decode(encoded_job)
+ klass = Util.constantize(decoded_job['class'])
+ sum + enqueue_delayed(klass, *decoded_job['args'])
+ end
+ end
+
+ # Given a block, find jobs that return true from a block
+ #
+ # This allows for finding of delayed jobs that have arguments matching
+ # certain criteria
+ def find_delayed_selection(klass = nil, &block)
+ fail ArgumentError, 'Please supply a block' unless block_given?
+
+ found_jobs = []
start = nil
while start = search_first_delayed_timestamp_in_range(start, nil)
job = "delayed:#{start}"
start += 1
index = Resque.redis.llen(job) - 1
while index >= 0
payload = Resque.redis.lindex(job, index)
decoded_payload = decode(payload)
- job_class = decoded_payload['class']
- relevant_class = (klass.nil? || klass.to_s == job_class)
- if relevant_class && yield(decoded_payload['args'])
- removed = remove_delayed_job(payload)
- destroyed += removed
- index -= removed
- else
- index -= 1
+ if payload_matches_selection?(decoded_payload, klass, &block)
+ found_jobs.push(payload)
end
+ index -= 1
end
end
-
- destroyed
+ found_jobs
end
# Given a timestamp and job (klass + args) it removes all instances and
# returns the count of jobs removed.
#
@@ -279,9 +301,16 @@
:delayed_queue_schedule, start_at, stop_at,
limit: [0, 1]
)
timestamp = items.nil? ? nil : Array(items).first
timestamp.to_i unless timestamp.nil?
+ end
+
+ def payload_matches_selection?(decoded_payload, klass)
+ return false if decoded_payload.nil?
+ job_class = decoded_payload['class']
+ relevant_class = (klass.nil? || klass.to_s == job_class)
+ relevant_class && yield(decoded_payload['args'])
end
def plugin
Resque::Scheduler::Plugin
end