lib/pheme/queue_poller.rb in pheme-0.0.11 vs lib/pheme/queue_poller.rb in pheme-1.0.0
- old
+ new
@@ -1,13 +1,13 @@
module Pheme
class QueuePoller
attr_accessor :queue_url, :queue_poller, :connection_pool_block, :format, :max_messages, :poller_configuration
- def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {})
+ def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil)
raise ArgumentError, "must specify non-nil queue_url" unless queue_url.present?
@queue_url = queue_url
- @queue_poller = Aws::SQS::QueuePoller.new(queue_url)
+ @queue_poller = Aws::SQS::QueuePoller.new(queue_url, client: sqs_client)
@connection_pool_block = connection_pool_block
@messages_processed = 0
@messages_received = 0
@format = format
@max_messages = max_messages
@@ -30,11 +30,12 @@
queue_poller.poll(poller_configuration) do |queue_message|
@messages_received += 1
Pheme.logger.tagged(queue_message.message_id) do
begin
content = parse_body(queue_message)
- handle(content)
+ metadata = parse_metadata(queue_message)
+ handle(content, metadata)
queue_poller.delete_message(queue_message)
log_delete(queue_message)
@messages_processed += 1
rescue SignalException
throw :stop_polling
@@ -71,10 +72,15 @@
log_message_received(queue_message, body)
parsed_content
end
+ def parse_metadata(queue_message)
+ message_body = JSON.parse(queue_message.body)
+ { timestamp: message_body['Timestamp'] }
+ end
+
def get_metadata(message_body)
message_body.except('Message', 'Records')
end
def get_content(body)
@@ -89,10 +95,10 @@
def parse_json(message_contents)
parsed_body = JSON.parse(message_contents)
RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper
end
- def handle(_message)
+ def handle(_message, _metadata)
raise NotImplementedError
end
private