lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.0.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.1.0

- old
+ new

@@ -156,32 +156,54 @@ # This allows for removal of delayed jobs that have arguments matching # certain criteria def remove_delayed_selection(klass = nil) fail ArgumentError, 'Please supply a block' unless block_given? - destroyed = 0 + found_jobs = find_delayed_selection(klass) { |args| yield(args) } + found_jobs.reduce(0) do |sum, encoded_job| + sum + remove_delayed_job(encoded_job) + end + end + + # Given a block, enqueue jobs now that return true from a block + # + # This allows for enqueuing of delayed jobs that have arguments matching + # certain criteria + def enqueue_delayed_selection(klass = nil) + fail ArgumentError, 'Please supply a block' unless block_given? + + found_jobs = find_delayed_selection(klass) { |args| yield(args) } + found_jobs.reduce(0) do |sum, encoded_job| + decoded_job = decode(encoded_job) + klass = Util.constantize(decoded_job['class']) + sum + enqueue_delayed(klass, *decoded_job['args']) + end + end + + # Given a block, find jobs that return true from a block + # + # This allows for finding of delayed jobs that have arguments matching + # certain criteria + def find_delayed_selection(klass = nil, &block) + fail ArgumentError, 'Please supply a block' unless block_given? + + found_jobs = [] start = nil while start = search_first_delayed_timestamp_in_range(start, nil) job = "delayed:#{start}" start += 1 index = Resque.redis.llen(job) - 1 while index >= 0 payload = Resque.redis.lindex(job, index) decoded_payload = decode(payload) - job_class = decoded_payload['class'] - relevant_class = (klass.nil? || klass.to_s == job_class) - if relevant_class && yield(decoded_payload['args']) - removed = remove_delayed_job(payload) - destroyed += removed - index -= removed - else - index -= 1 + if payload_matches_selection?(decoded_payload, klass, &block) + found_jobs.push(payload) end + index -= 1 end end - - destroyed + found_jobs end # Given a timestamp and job (klass + args) it removes all instances and # returns the count of jobs removed. # @@ -279,9 +301,16 @@ :delayed_queue_schedule, start_at, stop_at, limit: [0, 1] ) timestamp = items.nil? ? nil : Array(items).first timestamp.to_i unless timestamp.nil? + end + + def payload_matches_selection?(decoded_payload, klass) + return false if decoded_payload.nil? + job_class = decoded_payload['class'] + relevant_class = (klass.nil? || klass.to_s == job_class) + relevant_class && yield(decoded_payload['args']) end def plugin Resque::Scheduler::Plugin end