lib/pheme/queue_poller.rb in pheme-0.0.11 vs lib/pheme/queue_poller.rb in pheme-1.0.0

- old
+ new

@@ -1,13 +1,13 @@ module Pheme class QueuePoller attr_accessor :queue_url, :queue_poller, :connection_pool_block, :format, :max_messages, :poller_configuration - def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}) + def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil) raise ArgumentError, "must specify non-nil queue_url" unless queue_url.present? @queue_url = queue_url - @queue_poller = Aws::SQS::QueuePoller.new(queue_url) + @queue_poller = Aws::SQS::QueuePoller.new(queue_url, client: sqs_client) @connection_pool_block = connection_pool_block @messages_processed = 0 @messages_received = 0 @format = format @max_messages = max_messages @@ -30,11 +30,12 @@ queue_poller.poll(poller_configuration) do |queue_message| @messages_received += 1 Pheme.logger.tagged(queue_message.message_id) do begin content = parse_body(queue_message) - handle(content) + metadata = parse_metadata(queue_message) + handle(content, metadata) queue_poller.delete_message(queue_message) log_delete(queue_message) @messages_processed += 1 rescue SignalException throw :stop_polling @@ -71,10 +72,15 @@ log_message_received(queue_message, body) parsed_content end + def parse_metadata(queue_message) + message_body = JSON.parse(queue_message.body) + { timestamp: message_body['Timestamp'] } + end + def get_metadata(message_body) message_body.except('Message', 'Records') end def get_content(body) @@ -89,10 +95,10 @@ def parse_json(message_contents) parsed_body = JSON.parse(message_contents) RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper end - def handle(_message) + def handle(_message, _metadata) raise NotImplementedError end private