Sha256: af47d92242687b60a5cfa232d5aa247f3cda3352063d8332c8ba5a2564b5a222

Contents?: true

Size: 1.23 KB

Versions: 1

Compression:

Stored size: 1.23 KB

Contents

module Shoryuken
  class Fetcher
    include Util

    FETCH_LIMIT = 10

    def fetch(queue, available_processors)
      watchdog('Fetcher#fetch died') do
        started_at = Time.now

        logger.debug { "Looking for new messages in '#{queue}'" }

        begin
          limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

          sqs_msgs = Array(receive_messages(queue, limit))
          logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" }
          logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
          sqs_msgs
        rescue => ex
          logger.error { "Error fetching message: #{ex}" }
          logger.error { ex.backtrace.first }
          []
        end
      end
    end

    private

    def receive_messages(queue, limit)
      # AWS limits the batch size by 10
      limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

      options = (Shoryuken.options[:aws][:receive_message] || {}).dup
      options[:max_number_of_messages] = limit
      options[:message_attribute_names] = %w(All)
      options[:attribute_names] = %w(All)

      options.merge!(queue.options)

      Shoryuken::Client.queues(queue.name).receive_messages(options)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
shoryuken-2.1.2 lib/shoryuken/fetcher.rb