lib/sidekiq/api.rb in sidekiq-7.0.0.beta1 vs lib/sidekiq/api.rb in sidekiq-7.0.0

- old
+ new

@@ -1032,24 +1032,24 @@ class WorkSet include Enumerable def each(&block) results = [] + procs = nil + all_works = nil + Sidekiq.redis do |conn| - procs = conn.sscan("processes").to_a - procs.sort.each do |key| - valid, workers = conn.pipelined { |pipeline| - pipeline.exists(key) + procs = conn.sscan("processes").to_a.sort + all_works = conn.pipelined do |pipeline| + procs.each do |key| pipeline.hgetall("#{key}:work") - } - next unless valid > 0 - workers.each_pair do |tid, json| - hsh = Sidekiq.load_json(json) - p = hsh["payload"] - # avoid breaking API, this is a side effect of the JSON optimization in #4316 - hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String) - results << [key, tid, hsh] end + end + end + + procs.zip(all_works).each do |key, workers| + workers.each_pair do |tid, json| + results << [key, tid, Sidekiq.load_json(json)] unless json.empty? end end results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block) end