lib/shoryuken/fetcher.rb in shoryuken-0.0.3 vs lib/shoryuken/fetcher.rb in shoryuken-0.0.4
- old
+ new
@@ -11,21 +11,26 @@
def receive_message(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit
- Shoryuken::Client.receive_message queue, Shoryuken.options[:aws][:receive_message].to_h.merge(limit: limit)
+ options = Shoryuken.options[:aws][:receive_message].to_h
+ options[:limit] = limit
+ options[:message_attribute_names] ||= []
+ options[:message_attribute_names] << 'shoryuken_class'
+
+ Shoryuken::Client.receive_message queue, options
end
def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now
- logger.info "Looking for new messages '#{queue}'"
+ logger.debug "Looking for new messages in '#{queue}'"
begin
- batch = !!Shoryuken.workers[queue].get_shoryuken_options['batch']
+ batch = !!(Shoryuken.workers[queue] && Shoryuken.workers[queue].get_shoryuken_options['batch'])
limit = batch ? FETCH_LIMIT : available_processors
if (sqs_msgs = Array(receive_message(queue, limit))).any?
logger.info "Found #{sqs_msgs.size} messages for '#{queue}'"
@@ -36,10 +41,10 @@
sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) }
end
@manager.async.rebalance_queue_weight!(queue)
else
- logger.info "No message found for '#{queue}'"
+ logger.debug "No message found for '#{queue}'"
@manager.async.pause_queue!(queue)
end
@manager.async.dispatch