Sha256: dd90474d396f81f0119402261328330b757d08f727ae0f2c88b1c10acf083399
Contents?: true
Size: 1.47 KB
Versions: 2
Compression:
Stored size: 1.47 KB
Contents
module Sidekiq module ThrottledWorker module Concurrency class << self def limit?(worker_class, jid) # only allow one worker check limit concurrency = worker_class.get_sidekiq_options["concurrency"] ttl = (worker_class.get_sidekiq_options["concurrency_ttl"] || 900) # assume worker should finish run in 15 minutes, and worker may be block completely max for 15 minutes in extreme case check_limit_key = "sidekiq:throttled_worker:check_limit:#{worker_class}" now = Time.now.to_f Sidekiq.redis do |conn| return true unless conn.set(check_limit_key, 1, nx: true, ex: 60) conn.zremrangebyscore(concurrency_key(worker_class), "-inf", "(#{now}") return true if conn.zcard(concurrency_key(worker_class)) >= concurrency && conn.zscore(concurrency_key(worker_class), jid).nil? conn.zadd(concurrency_key(worker_class), (now + ttl).to_i, jid) conn.expire(concurrency_key(worker_class), (now + ttl).to_i) false end ensure Sidekiq.redis do |conn| conn.del(check_limit_key) end end def finalize!(worker_class, jid) Sidekiq.redis do |conn| conn.zrem(concurrency_key(worker_class), jid) end end private def concurrency_key(worker_class) "sidekiq:throttled_worker:concurrency:#{worker_class}" end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-throttled-worker-0.1.1 | lib/sidekiq/throttled_worker/concurrency.rb |
sidekiq-throttled-worker-0.1.0 | lib/sidekiq/throttled_worker/concurrency.rb |