Sha256: a445a6301127d0ec01afb3453cd5b4ddc28546005652909bcd9ae35239e60901

Contents?: true

Size: 1.47 KB

Versions: 8

Compression:

Stored size: 1.47 KB

Contents

module Eventboss
  module Polling
    class Basic
      PAUSE_AFTER_EMPTY = 2 # seconds

      def initialize(queues, timer = Time)
        @queues = queues.to_a
        @timer = timer
        @paused_until = @queues.each_with_object(Hash.new) do |queue, hash|
          hash[queue] = @timer.at(0)
        end

        reset_next_queue
      end

      def next_queue
        next_active_queue
      end

      def messages_found(queue, messages_count)
        if messages_count == 0
          pause(queue)
        else
          reset_next_queue
        end
      end

      def active_queues
        @queues.reject { |q, _| queue_paused?(q) }
      end

      private

      def next_active_queue
        reset_next_queue if queues_unpaused_since?

        size = @queues.length
        size.times do
          queue = @queues[@next_queue_index]
          @next_queue_index = (@next_queue_index + 1) % size
          return queue unless queue_paused?(queue)
        end

        nil
      end

      def queues_unpaused_since?
        last = @last_unpause_check
        now = @last_unpause_check = @timer.now

        last && @paused_until.values.any? { |t| t > last && t <= now }
      end

      def reset_next_queue
        @next_queue_index = 0
      end

      def queue_paused?(queue)
        @paused_until[queue] > @timer.now
      end

      def pause(queue)
        return unless PAUSE_AFTER_EMPTY > 0
        @paused_until[queue] = @timer.now + PAUSE_AFTER_EMPTY
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
eventboss-1.1.0 lib/eventboss/polling/basic.rb
eventboss-1.0.6 lib/eventboss/polling/basic.rb
eventboss-1.0.5 lib/eventboss/polling/basic.rb
eventboss-1.0.4 lib/eventboss/polling/basic.rb
eventboss-1.0.3 lib/eventboss/polling/basic.rb
eventboss-1.0.2 lib/eventboss/polling/basic.rb
eventboss-1.0.1 lib/eventboss/polling/basic.rb
eventboss-1.0.0 lib/eventboss/polling/basic.rb