Sha256: e25ee62e041abbacc44b21bd98cb2c8ef945113ec05d8730b62e530eae732a89

Contents?: true

Size: 1.81 KB

Versions: 3

Compression:

Stored size: 1.81 KB

Contents

require 'sidekiq'
require 'celluloid'

module Sidekiq
  ##
  # The Fetcher blocks on Redis, waiting for a message to process
  # from the queues.  It gets the message and hands it to the Manager
  # to assign to a ready Processor.
  class Fetcher
    include Celluloid
    include Sidekiq::Util

    TIMEOUT = 1

    def initialize(mgr, queues)
      @mgr = mgr
      @queues = queues.map { |q| "queue:#{q}" }
      @unique_queues = @queues.uniq
    end

    # Fetching is straightforward: the Manager makes a fetch
    # request for each idle processor when Sidekiq starts and
    # then issues a new fetch request every time a Processor
    # finishes a message.
    #
    # Because we have to shut down cleanly, we can't block
    # forever and we can't loop forever.  Instead we reschedule
    # a new fetch if the current fetch turned up nothing.
    def fetch
      watchdog('Fetcher#fetch died') do
        begin
          queue = nil
          msg = nil
          Sidekiq.redis { |conn| queue, msg = conn.blpop(*queues_cmd) }

          if msg
            @mgr.assign!(msg, queue.gsub(/.*queue:/, ''))
          else
            after(0) { fetch }
          end
        rescue => ex
          logger.error("Error fetching message: #{ex}")
          logger.error(ex.backtrace.first)
          sleep(TIMEOUT)
          after(0) { fetch }
        end
      end
    end

    private

    # Creating the Redis#blpop command takes into account any
    # configured queue weights. By default Redis#blpop returns
    # data from the first queue that has pending elements. We
    # recreate the queue command each time we invoke Redis#blpop
    # to honor weights and avoid queue starvation.
    def queues_cmd
      queues = @queues.sample(@unique_queues.size).uniq
      queues.concat(@unique_queues - queues)
      queues << TIMEOUT
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
sidekiq-1.2.0 lib/sidekiq/fetch.rb
sidekiq-1.1.4 lib/sidekiq/fetch.rb
sidekiq-1.1.3 lib/sidekiq/fetch.rb