Sha256: bd34af61899f424848b5679c9d0307ae508c3a1a1c5722ae59f0cbaa4eaa35f2

Contents?: true

Size: 1.23 KB

Versions: 4

Compression:

Stored size: 1.23 KB

Contents

module Shoryuken
  class Fetcher
    include Util

    FETCH_LIMIT = 10

    def initialize(group)
      @group = group
    end

    def fetch(queue, limit)
      started_at = Time.now

      logger.debug { "Looking for new messages in #{queue}" }

      sqs_msgs = Array(receive_messages(queue, [FETCH_LIMIT, limit].min))

      logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty?
      logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" }

      sqs_msgs
    end

    private

    def receive_messages(queue, limit)
      options = receive_options(queue)

      options[:max_number_of_messages]  = max_number_of_messages(limit, options)
      options[:message_attribute_names] = %w(All)
      options[:attribute_names]         = %w(All)

      options.merge!(queue.options)

      Shoryuken::Client.queues(queue.name).receive_messages(options)
    end

    def max_number_of_messages(limit, options)
      [limit, FETCH_LIMIT, options[:max_number_of_messages]].compact.min
    end

    def receive_options(queue)
      options = Shoryuken.sqs_client_receive_message_opts[queue.name]
      options ||= Shoryuken.sqs_client_receive_message_opts[@group]

      options.to_h.dup
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
shoryuken-3.1.10 lib/shoryuken/fetcher.rb
shoryuken-3.1.9 lib/shoryuken/fetcher.rb
shoryuken-3.1.8 lib/shoryuken/fetcher.rb
shoryuken-3.1.7 lib/shoryuken/fetcher.rb