Sha256: dcc3b5ab1b23dcba6ccbd21aab482c79aad370ca2c8e8cfc868d0cf22a828523

Contents?: true

Size: 1.18 KB

Versions: 4

Compression:

Stored size: 1.18 KB

Contents

class Sidekiq::LimitFetch
  class Queues
    THREAD_KEY = :acquired_queues
    attr_reader :selector

    def initialize(options)
      @queues = options[:queues]
      options[:strict] ? strict_order! : weighted_order!

      set_selector options[:global]
      set_limits options[:limits]
    end

    def acquire
      @selector.acquire(ordered_queues)
        .tap {|it| save it }
        .map {|it| "queue:#{it}" }
    end
    
    def release_except(full_name)
      @selector.release restore.delete_if {|name| full_name.to_s.include? name }
    end

    private

    def set_selector(global)
      @selector = global ? Global::Selector : Local::Selector
    end

    def set_limits(limits)
      ordered_queues.each do |name|
        Sidekiq::Queue[name].tap do |it|
          it.limit = (limits || {})[name]
        end
      end
    end

    def strict_order!
      @queues.uniq!
      def ordered_queues; @queues end
    end

    def weighted_order!
      def ordered_queues; @queues.shuffle.uniq end
    end

    def save(queues)
      Thread.current[THREAD_KEY] = queues
    end

    def restore
      Thread.current[THREAD_KEY]
    ensure
      Thread.current[THREAD_KEY] = nil
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
sidekiq-limit_fetch-0.9 lib/sidekiq/limit_fetch/queues.rb
sidekiq-limit_fetch-0.8 lib/sidekiq/limit_fetch/queues.rb
sidekiq-limit_fetch-0.7 lib/sidekiq/limit_fetch/queues.rb
sidekiq-limit_fetch-0.6 lib/sidekiq/limit_fetch/queues.rb