lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-2.1.2 vs lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-2.1.3

- old
+ new

@@ -40,10 +40,14 @@ def busy redis {|it| it.llen "#{PREFIX}:busy:#@name" } end + def busy_processes + redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 } + end + def increase_busy increase_local_busy redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid } end @@ -54,10 +58,14 @@ def probed redis {|it| it.llen "#{PREFIX}:probed:#@name" } end + def probed_processes + redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 } + end + def pause redis {|it| it.set "#{PREFIX}:pause:#@name", true } end def unpause @@ -93,8 +101,40 @@ @lock.synchronize { @local_busy -= 1 } end def local_busy? @local_busy > 0 + end + + def explain + <<-END.gsub(/^ {8}/, '') + Current sidekiq process: #{Selector.uuid} + + All processes: + #{Monitor.all_processes.join "\n"} + + Stale processes: + #{Monitor.old_processes.join "\n"} + + Locked queue processes: + #{probed_processes.sort.join "\n"} + + Busy queue processes: + #{busy_processes.sort.join "\n"} + END + end + + def remove_locks_except!(processes) + locked_processes = probed_processes.uniq + (locked_processes - processes).each do |dead_process| + remove_lock! dead_process + end + end + + def remove_lock!(process) + redis do |it| + it.lrem "#{PREFIX}:probed:#@name", 0, process + it.lrem "#{PREFIX}:busy:#@name", 0, process + end end end end