lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-2.1.2 vs lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-2.1.3
- old
+ new
@@ -40,10 +40,14 @@
def busy
redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end
+ def busy_processes
+ redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 }
+ end
+
def increase_busy
increase_local_busy
redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
end
@@ -54,10 +58,14 @@
def probed
redis {|it| it.llen "#{PREFIX}:probed:#@name" }
end
+ def probed_processes
+ redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 }
+ end
+
def pause
redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end
def unpause
@@ -93,8 +101,40 @@
@lock.synchronize { @local_busy -= 1 }
end
def local_busy?
@local_busy > 0
+ end
+
+ def explain
+ <<-END.gsub(/^ {8}/, '')
+ Current sidekiq process: #{Selector.uuid}
+
+ All processes:
+ #{Monitor.all_processes.join "\n"}
+
+ Stale processes:
+ #{Monitor.old_processes.join "\n"}
+
+ Locked queue processes:
+ #{probed_processes.sort.join "\n"}
+
+ Busy queue processes:
+ #{busy_processes.sort.join "\n"}
+ END
+ end
+
+ def remove_locks_except!(processes)
+ locked_processes = probed_processes.uniq
+ (locked_processes - processes).each do |dead_process|
+ remove_lock! dead_process
+ end
+ end
+
+ def remove_lock!(process)
+ redis do |it|
+ it.lrem "#{PREFIX}:probed:#@name", 0, process
+ it.lrem "#{PREFIX}:busy:#@name", 0, process
+ end
end
end
end