Sha256: 33d4cfd611d7c7e62bc4d9acd197e2a39ed1c99eccf580e2627bdbbb0c5a1484
Contents?: true
Size: 1.45 KB
Versions: 1
Compression:
Stored size: 1.45 KB
Contents
class Sidekiq::LimitFetch class Queues THREAD_KEY = :acquired_queues attr_reader :selector def initialize(options) @queues = options[:queues] options[:strict] ? strict_order! : weighted_order! set_selector options[:global] set_limits options[:limits] set_blocks options[:blocking] end def acquire @selector.acquire(ordered_queues) .tap {|it| save it } .map {|it| "queue:#{it}" } end def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name @selector.release queues end private def set_selector(global) @selector = global ? Global::Selector : Local::Selector end def set_limits(limits) return unless limits limits.each do |name, limit| Sidekiq::Queue[name].limit = limit end end def set_blocks(blocks) blocks.to_a.each do |it| if it.is_a? Array it.each {|name| Sidekiq::Queue[name].block_except it } else Sidekiq::Queue[it].block end end end def strict_order! @queues.uniq! def ordered_queues; @queues end end def weighted_order! def ordered_queues; @queues.shuffle.uniq end end def save(queues) Thread.current[THREAD_KEY] = queues end def restore Thread.current[THREAD_KEY] || [] ensure Thread.current[THREAD_KEY] = nil end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-limit_fetch-1.4 | lib/sidekiq/limit_fetch/queues.rb |