Sha256: bd34af61899f424848b5679c9d0307ae508c3a1a1c5722ae59f0cbaa4eaa35f2
Contents?: true
Size: 1.23 KB
Versions: 4
Compression:
Stored size: 1.23 KB
Contents
module Shoryuken class Fetcher include Util FETCH_LIMIT = 10 def initialize(group) @group = group end def fetch(queue, limit) started_at = Time.now logger.debug { "Looking for new messages in #{queue}" } sqs_msgs = Array(receive_messages(queue, [FETCH_LIMIT, limit].min)) logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty? logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } sqs_msgs end private def receive_messages(queue, limit) options = receive_options(queue) options[:max_number_of_messages] = max_number_of_messages(limit, options) options[:message_attribute_names] = %w(All) options[:attribute_names] = %w(All) options.merge!(queue.options) Shoryuken::Client.queues(queue.name).receive_messages(options) end def max_number_of_messages(limit, options) [limit, FETCH_LIMIT, options[:max_number_of_messages]].compact.min end def receive_options(queue) options = Shoryuken.sqs_client_receive_message_opts[queue.name] options ||= Shoryuken.sqs_client_receive_message_opts[@group] options.to_h.dup end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
shoryuken-3.1.10 | lib/shoryuken/fetcher.rb |
shoryuken-3.1.9 | lib/shoryuken/fetcher.rb |
shoryuken-3.1.8 | lib/shoryuken/fetcher.rb |
shoryuken-3.1.7 | lib/shoryuken/fetcher.rb |