Sha256: 9e9ee5b4b5cd0eeff73518e6d79f073523d06ea3876cd9b3b5bcd33868f1cc97

Contents?: true

Size: 1.57 KB

Versions: 22

Compression:

Stored size: 1.57 KB

Contents

module Shoryuken
  module Polling
    class WeightedRoundRobin < BaseStrategy
      def initialize(queues)
        @initial_queues = queues
        @queues = queues.dup.uniq
        @paused_queues = []
      end

      def next_queue
        unpause_queues
        queue = @queues.shift
        return nil if queue.nil?

        @queues << queue
        QueueConfiguration.new(queue, {})
      end

      def messages_found(queue, messages_found)
        if messages_found == 0
          pause(queue)
          return
        end

        maximum_weight = maximum_queue_weight(queue)
        current_weight = current_queue_weight(queue)
        if maximum_weight > current_weight
          logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" }
          @queues << queue
        end
      end

      def active_queues
        unparse_queues(@queues)
      end

      private

      def pause(queue)
        return unless @queues.delete(queue)
        @paused_queues << [Time.now + delay, queue]
        logger.debug "Paused #{queue}"
      end

      def unpause_queues
        return if @paused_queues.empty?
        return if Time.now < @paused_queues.first[0]
        pause = @paused_queues.shift
        @queues << pause[1]
        logger.debug "Unpaused #{pause[1]}"
      end

      def current_queue_weight(queue)
        queue_weight(@queues, queue)
      end

      def maximum_queue_weight(queue)
        queue_weight(@initial_queues, queue)
      end

      def queue_weight(queues, queue)
        queues.count { |q| q == queue }
      end
    end
  end
end

Version data entries

22 entries across 22 versions & 1 rubygems

Version Path
shoryuken-4.0.2 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-4.0.1 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-4.0.0 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.3.1 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.3.0 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.2.3 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.2.2 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.2.1 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.2.0 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.12 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.11 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.10 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.9 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.8 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.7 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.6 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.5 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.4 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.3 lib/shoryuken/polling/weighted_round_robin.rb
shoryuken-3.1.2 lib/shoryuken/polling/weighted_round_robin.rb