Sha256: 975f283bfb189701e3de92883acd4c423cebe579af45d1f126dba8709f927862

Contents?: true

Size: 1.88 KB

Versions: 3

Compression:

Stored size: 1.88 KB

Contents

module Sidekiq::LimitFetch::Queues
  extend self

  THREAD_KEY = :acquired_queues

  def start(options)
    @queues    = options[:queues]
    @dynamic   = options[:dynamic]

    options[:strict] ? strict_order! : weighted_order!

    set :process_limit, options[:process_limits]
    set :limit, options[:limits]
    set_blocks options[:blocking]
  end

  def acquire
    selector.acquire(ordered_queues, namespace)
      .tap {|it| save it }
      .map {|it| "queue:#{it}" }
  end

  def release_except(full_name)
    queues = restore
    queues.delete full_name[/queue:(.*)/, 1] if full_name
    selector.release queues, namespace
  end

  def dynamic?
    @dynamic
  end

  def add(queues)
    queues.each do |queue|
      @queues.push queue unless @queues.include? queue
    end
  end

  def strict_order!
    @queues.uniq!
    def ordered_queues; @queues end
  end

  def weighted_order!
    def ordered_queues; @queues.shuffle.uniq end
  end

  def namespace
    @namespace ||= Sidekiq.redis do |it|
      if it.respond_to?(:namespace) and it.namespace
        it.namespace + ':'
      else
        ''
      end
    end
  end

  private

  def selector
    Sidekiq::LimitFetch::Global::Selector
  end

  def set(limit_type, limits)
    limits ||= {}
    each_queue do |queue|
      limit = limits[queue.name.to_s] || limits[queue.name.to_sym]
      queue.send "#{limit_type}=", limit unless queue.limit_changed?
    end
  end

  def set_blocks(blocks)
    each_queue(&:unblock)

    blocks.to_a.each do |it|
      if it.is_a? Array
        it.each {|name| Sidekiq::Queue[name].block_except it }
      else
        Sidekiq::Queue[it].block
      end
    end
  end

  def save(queues)
    Thread.current[THREAD_KEY] = queues
  end

  def restore
    Thread.current[THREAD_KEY] || []
  ensure
    Thread.current[THREAD_KEY] = nil
  end

  def each_queue
    @queues.uniq.each {|it| yield Sidekiq::Queue[it] }
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
sidekiq-limit_fetch-3.1.0 lib/sidekiq/limit_fetch/queues.rb
sidekiq-limit_fetch-3.0.1 lib/sidekiq/limit_fetch/queues.rb
sidekiq-limit_fetch-3.0.0 lib/sidekiq/limit_fetch/queues.rb