Sha256: eb0752b5d7ab867517365e491b107ab4ad497577fe51485ed4d7a965595954f9

Contents?: true

Size: 1.8 KB

Versions: 17

Compression:

Stored size: 1.8 KB

Contents

module Shoryuken
  module Polling
    class StrictPriority < BaseStrategy
      def initialize(queues)
        # Priority ordering of the queues, highest priority first
        @queues = queues
        .group_by { |q| q }
        .sort_by { |_, qs| -qs.count }
        .map(&:first)

        # Pause status of the queues, default to past time (unpaused)
        @paused_until = queues
        .each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) }

        # Start queues at 0
        reset_next_queue
      end

      def next_queue
        next_queue = next_active_queue
        next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
      end

      def messages_found(queue, messages_found)
        if messages_found == 0
          pause(queue)
        else
          reset_next_queue
        end
      end

      def active_queues
        @queues
        .reverse
        .map.with_index(1)
        .reject { |q, _| queue_paused?(q) }
        .reverse
      end

      private

      def next_active_queue
        reset_next_queue if queues_unpaused_since?

        size = @queues.length
        size.times do
          queue = @queues[@next_queue_index]
          @next_queue_index = (@next_queue_index + 1) % size
          return queue unless queue_paused?(queue)
        end

        nil
      end

      def queues_unpaused_since?
        last = @last_unpause_check
        now = @last_unpause_check = Time.now

        last && @paused_until.values.any? { |t| t > last && t <= now }
      end

      def reset_next_queue
        @next_queue_index = 0
      end

      def queue_paused?(queue)
        @paused_until[queue] > Time.now
      end

      def pause(queue)
        return unless delay > 0
        @paused_until[queue] = Time.now + delay
        logger.debug "Paused #{queue}"
      end
    end
  end
end

Version data entries

17 entries across 17 versions & 1 rubygems

Version Path
shoryuken-3.2.3 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.2.2 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.2.1 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.2.0 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.12 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.11 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.10 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.9 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.8 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.7 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.6 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.5 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.4 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.3 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.2 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.1 lib/shoryuken/polling/strict_priority.rb
shoryuken-3.1.0 lib/shoryuken/polling/strict_priority.rb