Sha256: aab7ee594c18b96b08ad34a1dd65590c1147cd3eb27b16c9a9d91be575acfa0a
Contents?: true
Size: 1.21 KB
Versions: 4
Compression:
Stored size: 1.21 KB
Contents
module Shoryuken class Fetcher include Util FETCH_LIMIT = 10 def fetch(queue, available_processors) started_at = Time.now logger.debug { "Looking for new messages in #{queue}" } begin limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors sqs_msgs = Array(receive_messages(queue, limit)) logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty? logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } sqs_msgs rescue => ex logger.error { "Error fetching message: #{ex.message}" } logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? [] end end private def receive_messages(queue, limit) # AWS limits the batch size by 10 limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit options = Shoryuken.sqs_client_receive_message_opts.to_h.dup options[:max_number_of_messages] = limit options[:message_attribute_names] = %w(All) options[:attribute_names] = %w(All) options.merge!(queue.options) Shoryuken::Client.queues(queue.name).receive_messages(options) end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
shoryuken-3.0.11 | lib/shoryuken/fetcher.rb |
shoryuken-3.0.10 | lib/shoryuken/fetcher.rb |
shoryuken-3.0.9 | lib/shoryuken/fetcher.rb |
shoryuken-3.0.8 | lib/shoryuken/fetcher.rb |