lib/shoryuken/fetcher.rb in shoryuken-0.0.3 vs lib/shoryuken/fetcher.rb in shoryuken-0.0.4

- old
+ new

@@ -11,21 +11,26 @@ def receive_message(queue, limit) # AWS limits the batch size by 10 limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - Shoryuken::Client.receive_message queue, Shoryuken.options[:aws][:receive_message].to_h.merge(limit: limit) + options = Shoryuken.options[:aws][:receive_message].to_h + options[:limit] = limit + options[:message_attribute_names] ||= [] + options[:message_attribute_names] << 'shoryuken_class' + + Shoryuken::Client.receive_message queue, options end def fetch(queue, available_processors) watchdog('Fetcher#fetch died') do started_at = Time.now - logger.info "Looking for new messages '#{queue}'" + logger.debug "Looking for new messages in '#{queue}'" begin - batch = !!Shoryuken.workers[queue].get_shoryuken_options['batch'] + batch = !!(Shoryuken.workers[queue] && Shoryuken.workers[queue].get_shoryuken_options['batch']) limit = batch ? FETCH_LIMIT : available_processors if (sqs_msgs = Array(receive_message(queue, limit))).any? logger.info "Found #{sqs_msgs.size} messages for '#{queue}'" @@ -36,10 +41,10 @@ sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) } end @manager.async.rebalance_queue_weight!(queue) else - logger.info "No message found for '#{queue}'" + logger.debug "No message found for '#{queue}'" @manager.async.pause_queue!(queue) end @manager.async.dispatch