lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.2.0 vs lib/resque/scheduler/delaying_extensions.rb in resque-scheduler-4.2.1

- old
+ new

@@ -154,11 +154,11 @@ # Given a block, remove jobs that return true from a block # # This allows for removal of delayed jobs that have arguments matching # certain criteria def remove_delayed_selection(klass = nil) - fail ArgumentError, 'Please supply a block' unless block_given? + raise ArgumentError, 'Please supply a block' unless block_given? found_jobs = find_delayed_selection(klass) { |args| yield(args) } found_jobs.reduce(0) do |sum, encoded_job| sum + remove_delayed_job(encoded_job) end @@ -167,11 +167,11 @@ # Given a block, enqueue jobs now that return true from a block # # This allows for enqueuing of delayed jobs that have arguments matching # certain criteria def enqueue_delayed_selection(klass = nil) - fail ArgumentError, 'Please supply a block' unless block_given? + raise ArgumentError, 'Please supply a block' unless block_given? found_jobs = find_delayed_selection(klass) { |args| yield(args) } found_jobs.reduce(0) do |sum, encoded_job| decoded_job = decode(encoded_job) klass = Util.constantize(decoded_job['class']) @@ -182,27 +182,27 @@ # Given a block, find jobs that return true from a block # # This allows for finding of delayed jobs that have arguments matching # certain criteria def find_delayed_selection(klass = nil, &block) - fail ArgumentError, 'Please supply a block' unless block_given? + raise ArgumentError, 'Please supply a block' unless block_given? - found_jobs = [] - start = nil - while start = search_first_delayed_timestamp_in_range(start, nil) - job = "delayed:#{start}" - start += 1 - index = Resque.redis.llen(job) - 1 - while index >= 0 - payload = Resque.redis.lindex(job, index) - decoded_payload = decode(payload) - if payload_matches_selection?(decoded_payload, klass, &block) - found_jobs.push(payload) + timestamps = redis.zrange(:delayed_queue_schedule, 0, -1) + + # Beyond 100 there's almost no improvement in speed + found = timestamps.each_slice(100).map do |ts_group| + jobs = redis.pipelined do |r| + ts_group.each do |ts| + r.lrange("delayed:#{ts}", 0, -1) end - index -= 1 end + + jobs.flatten.select do |payload| + payload_matches_selection?(decode(payload), klass, &block) + end end - found_jobs + + found.flatten end # Given a timestamp and job (klass + args) it removes all instances and # returns the count of jobs removed. #