Sha256: fe14dd1c9448e57ff58c69bc3d16e6d2a0cdc930cebc7f2188e56b46457d4734

Contents?: true

Size: 1.26 KB

Versions: 2

Compression:

Stored size: 1.26 KB

Contents

module Sidekiq
  module ThrottledWorker
    module Concurrency
      class << self
        def limit?(worker_class, jid)
          # only allow one worker check limit
          concurrency = worker_class.get_sidekiq_options["concurrency"]
          ttl = (worker_class.get_sidekiq_options["concurrency_ttl"] || 900) # assume worker should finish run in 15 minutes, and worker may be block completely max for 15 minutes in extreme case
          now = Time.now.to_f
          Sidekiq::ThrottledWorker.redis.with do |conn|
            conn.zremrangebyscore(concurrency_key(worker_class), "-inf", "(#{now}")
            return true if conn.zcard(concurrency_key(worker_class)) >= concurrency && conn.zscore(concurrency_key(worker_class), jid).nil?
            conn.zadd(concurrency_key(worker_class), (now + ttl).to_i, jid)
            conn.expire(concurrency_key(worker_class), (now + ttl).to_i)

            false
          end
        end

        def finalize!(worker_class, jid)
          Sidekiq::ThrottledWorker.redis.with do |conn|
            conn.zrem(concurrency_key(worker_class), jid)
          end
        end

        private

        def concurrency_key(worker_class)
          "sidekiq:throttled_worker:concurrency:#{worker_class}"
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
sidekiq-throttled-worker-0.1.3 lib/sidekiq/throttled_worker/concurrency.rb
sidekiq-throttled-worker-0.1.2 lib/sidekiq/throttled_worker/concurrency.rb