lib/alephant/publisher/queue/sqs_helper/queue.rb in alephant-publisher-queue-2.5.0 vs lib/alephant/publisher/queue/sqs_helper/queue.rb in alephant-publisher-queue-2.6.0
- old
+ new
@@ -1,6 +1,6 @@
-require "aws-sdk"
+require "aws-sdk-sqs"
require "alephant/logger"
module Alephant
module Publisher
module Queue
@@ -25,11 +25,11 @@
@wait_time = wait_time
log_queue_creation queue.url, archiver, timeout
end
def message
- receive.tap { |m| process(m) unless m.nil? }
+ receive.tap { |m| process(m) }
end
private
def log_queue_creation(queue_url, archiver, timeout)
@@ -41,17 +41,20 @@
"method" => "#{self.class}#initialize"
)
end
def process(m)
+ return unless m.size > 0
+
logger.metric "MessagesReceived"
logger.info(
"event" => "QueueMessageReceived",
- "messageId" => m.id,
+ "messageId" => m.first.message_id,
"method" => "#{self.class}#process"
)
- archive m
+ # @TODO: Look at archiver as should support message from collection.
+ archive m.first
end
def archive(m)
archiver.see(m) unless archiver.nil?
rescue StandardError => e
@@ -64,12 +67,13 @@
"method" => "#{self.class}#archive"
)
end
def receive
- queue.receive_message(
- :visibility_timeout => timeout,
- :wait_time_seconds => wait_time
+ queue.receive_messages(
+ :visibility_timeout => timeout,
+ :wait_time_seconds => wait_time,
+ :max_number_of_messages => 1
)
end
end
end
end