lib/sidekiq/api.rb in sidekiq-7.0.0.beta1 vs lib/sidekiq/api.rb in sidekiq-7.0.0
- old
+ new
@@ -1032,24 +1032,24 @@
class WorkSet
include Enumerable
def each(&block)
results = []
+ procs = nil
+ all_works = nil
+
Sidekiq.redis do |conn|
- procs = conn.sscan("processes").to_a
- procs.sort.each do |key|
- valid, workers = conn.pipelined { |pipeline|
- pipeline.exists(key)
+ procs = conn.sscan("processes").to_a.sort
+ all_works = conn.pipelined do |pipeline|
+ procs.each do |key|
pipeline.hgetall("#{key}:work")
- }
- next unless valid > 0
- workers.each_pair do |tid, json|
- hsh = Sidekiq.load_json(json)
- p = hsh["payload"]
- # avoid breaking API, this is a side effect of the JSON optimization in #4316
- hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
- results << [key, tid, hsh]
end
+ end
+ end
+
+ procs.zip(all_works).each do |key, workers|
+ workers.each_pair do |tid, json|
+ results << [key, tid, Sidekiq.load_json(json)] unless json.empty?
end
end
results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
end