lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-0.7 vs lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-0.8
- old
+ new
@@ -1,20 +1,23 @@
module Sidekiq::LimitFetch::Global
class Semaphore
+ extend Forwardable
+ def_delegator Sidekiq, :redis
+
PREFIX = 'limit_fetch'
def initialize(name)
@name = name
end
def limit
- value = Sidekiq.redis {|it| it.get "#{PREFIX}:limit:#@name" }
+ value = redis {|it| it.get "#{PREFIX}:limit:#@name" }
value.to_i if value
end
def limit=(value)
- Sidekiq.redis {|it| it.set "#{PREFIX}:limit:#@name", value }
+ redis {|it| it.set "#{PREFIX}:limit:#@name", value }
end
def acquire
Selector.acquire([@name]).size > 0
end
@@ -22,17 +25,33 @@
def release
Selector.release [@name]
end
def busy
- Sidekiq.redis {|it| it.llen "#{PREFIX}:busy:#@name" }
+ redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end
def pause
- Sidekiq.redis {|it| it.set "#{PREFIX}:pause:#@name", true }
+ redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end
- def continue
- Sidekiq.redis {|it| it.del "#{PREFIX}:pause:#@name" }
+ def unpause
+ redis {|it| it.del "#{PREFIX}:pause:#@name" }
+ end
+
+ def paused?
+ redis {|it| it.get "#{PREFIX}:pause:#@name" }
+ end
+
+ def block
+ redis {|it| it.set "#{PREFIX}:block:#@name", true }
+ end
+
+ def unblock
+ redis {|it| it.del "#{PREFIX}:block:#@name" }
+ end
+
+ def blocking?
+ redis {|it| it.get "#{PREFIX}:block:#@name" }
end
end
end