Sha256: 106c976dcdb639fc41c709992336a113ce971f811e308a2758d3af9141efcf1e

Contents?: true

Size: 1.91 KB

Versions: 2

Compression:

Stored size: 1.91 KB

Contents

module Shoryuken
  class Fetcher
    include Celluloid
    include Util

    FETCH_LIMIT = 10

    def initialize(manager)
      @manager = manager
    end

    def receive_messages(queue, limit)
      # AWS limits the batch size by 10
      limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

      options = Shoryuken.options[:aws][:receive_message].to_h.dup
      options[:max_number_of_messages] = limit
      options[:message_attribute_names] = %w(All)
      options[:attribute_names] = %w(All)

      Shoryuken::Client.queues(queue).receive_messages options
    end

    def fetch(queue, available_processors)
      watchdog('Fetcher#fetch died') do
        started_at = Time.now

        logger.debug "Looking for new messages in '#{queue}'"

        begin
          batch = Shoryuken.worker_registry.batch_receive_messages?(queue)
          limit = batch ? FETCH_LIMIT : available_processors

          if (sqs_msgs = Array(receive_messages(queue, limit))).any?
            logger.info "Found #{sqs_msgs.size} messages for '#{queue}'"

            if batch
              @manager.async.assign(queue, patch_sqs_msgs!(sqs_msgs))
            else
              sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) }
            end

            @manager.async.rebalance_queue_weight!(queue)
          else
            logger.debug "No message found for '#{queue}'"

            @manager.async.pause_queue!(queue)
          end

          @manager.async.dispatch

          logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
        rescue => ex
          logger.error "Error fetching message: #{ex}"
          logger.error ex.backtrace.first

          @manager.async.dispatch
        end
      end

    end

    private

    def patch_sqs_msgs!(sqs_msgs)
      sqs_msgs.instance_eval do
        def message_id
          "batch-with-#{size}-messages"
        end
      end

      sqs_msgs
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
shoryuken-1.0.2 lib/shoryuken/fetcher.rb
shoryuken-1.0.1 lib/shoryuken/fetcher.rb