Sha256: 106c976dcdb639fc41c709992336a113ce971f811e308a2758d3af9141efcf1e
Contents?: true
Size: 1.91 KB
Versions: 2
Compression:
Stored size: 1.91 KB
Contents
module Shoryuken class Fetcher include Celluloid include Util FETCH_LIMIT = 10 def initialize(manager) @manager = manager end def receive_messages(queue, limit) # AWS limits the batch size by 10 limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit options = Shoryuken.options[:aws][:receive_message].to_h.dup options[:max_number_of_messages] = limit options[:message_attribute_names] = %w(All) options[:attribute_names] = %w(All) Shoryuken::Client.queues(queue).receive_messages options end def fetch(queue, available_processors) watchdog('Fetcher#fetch died') do started_at = Time.now logger.debug "Looking for new messages in '#{queue}'" begin batch = Shoryuken.worker_registry.batch_receive_messages?(queue) limit = batch ? FETCH_LIMIT : available_processors if (sqs_msgs = Array(receive_messages(queue, limit))).any? logger.info "Found #{sqs_msgs.size} messages for '#{queue}'" if batch @manager.async.assign(queue, patch_sqs_msgs!(sqs_msgs)) else sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) } end @manager.async.rebalance_queue_weight!(queue) else logger.debug "No message found for '#{queue}'" @manager.async.pause_queue!(queue) end @manager.async.dispatch logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } rescue => ex logger.error "Error fetching message: #{ex}" logger.error ex.backtrace.first @manager.async.dispatch end end end private def patch_sqs_msgs!(sqs_msgs) sqs_msgs.instance_eval do def message_id "batch-with-#{size}-messages" end end sqs_msgs end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
shoryuken-1.0.2 | lib/shoryuken/fetcher.rb |
shoryuken-1.0.1 | lib/shoryuken/fetcher.rb |