Sha256: cf9d62f8c8a30d02fbf29bc794cfb7aadcffee9d4d0de663767162f6c58c5aa3
Contents?: true
Size: 1.74 KB
Versions: 1
Compression:
Stored size: 1.74 KB
Contents
# internal require "sidekiq/throttled/strategy/script" module Sidekiq module Throttled class Strategy # Concurrency throttling strategy class Concurrency # LUA script used to limit fetch concurrency. # Logic behind the scene can be described in following pseudo code: # # return 1 if @limit <= LLEN(@key) # # PUSH(@key, @jid) # return 0 SCRIPT = Script.new File.read "#{__dir__}/concurrency.lua" private_constant :SCRIPT # @!attribute [r] limit # @return [Integer] Amount of allwoed concurrent job processors attr_reader :limit # @param [#to_s] base_key # @param [Hash] opts # @option opts [#to_i] :limit Amount of allwoed concurrent jobs # processors running for given key # @option opts [#to_i] :ttl (15.minutes) Concurrency lock TTL def initialize(base_key, opts) @key = "#{base_key}:concurrency".freeze @keys = [@key] @limit = opts.fetch(:limit).to_i @ttl = opts.fetch(:ttl, 900).to_i end # @return [Boolean] whenever job is throttled or not def throttled?(jid) 1 == SCRIPT.eval(@keys, [@limit, @ttl, jid.to_s]) end # @return [Integer] Current count of jobs def count Sidekiq.redis { |conn| conn.scard(@key) }.to_i end # Resets count of jobs # @return [void] def reset! Sidekiq.redis { |conn| conn.del(@key) }.to_i end # Remove jid from the pool of jobs in progress # @return [void] def finalize!(jid) Sidekiq.redis { |conn| conn.srem(@key, jid.to_s) } end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-throttled-0.1.0 | lib/sidekiq/throttled/strategy/concurrency.rb |