Sha256: aab7ee594c18b96b08ad34a1dd65590c1147cd3eb27b16c9a9d91be575acfa0a

Contents?: true

Size: 1.21 KB

Versions: 4

Compression:

Stored size: 1.21 KB

Contents

module Shoryuken
  class Fetcher
    include Util

    FETCH_LIMIT = 10

    def fetch(queue, available_processors)
      started_at = Time.now

      logger.debug { "Looking for new messages in #{queue}" }

      begin
        limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

        sqs_msgs = Array(receive_messages(queue, limit))
        logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty?
        logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" }
        sqs_msgs
      rescue => ex
        logger.error { "Error fetching message: #{ex.message}" }
        logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil?
        []
      end
    end

    private

    def receive_messages(queue, limit)
      # AWS limits the batch size by 10
      limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

      options = Shoryuken.sqs_client_receive_message_opts.to_h.dup
      options[:max_number_of_messages] = limit
      options[:message_attribute_names] = %w(All)
      options[:attribute_names] = %w(All)

      options.merge!(queue.options)

      Shoryuken::Client.queues(queue.name).receive_messages(options)
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
shoryuken-3.0.11 lib/shoryuken/fetcher.rb
shoryuken-3.0.10 lib/shoryuken/fetcher.rb
shoryuken-3.0.9 lib/shoryuken/fetcher.rb
shoryuken-3.0.8 lib/shoryuken/fetcher.rb