lib/pheme/queue_poller.rb in pheme-3.4.0 vs lib/pheme/queue_poller.rb in pheme-4.0.0
- old
+ new
@@ -1,8 +1,9 @@
require_relative 'compression'
module Pheme
+ # rubocop:disable Metrics/ClassLength
class QueuePoller
include Compression
attr_accessor :queue_url, :queue_poller, :connection_pool_block, :format, :max_messages, :poller_configuration
@@ -49,19 +50,21 @@
throw :stop_polling if stats.received_message_count >= max_messages
end
end
end
+ # rubocop:disable Metrics/AbcSize
def poll
time_start = log_polling_start
queue_poller.poll(poller_configuration) do |queue_message|
@messages_received += 1
Pheme.logger.tagged(queue_message.message_id) do
begin
content = parse_body(queue_message)
metadata = parse_metadata(queue_message)
- with_optional_connection_pool_block { handle(content, metadata) }
+ message_attributes = parse_message_attributes(queue_message)
+ with_optional_connection_pool_block { handle(content, metadata, message_attributes) }
queue_poller.delete_message(queue_message)
log_delete(queue_message)
@messages_processed += 1
rescue SignalException
throw :stop_polling
@@ -71,10 +74,11 @@
end
end
end
log_polling_end(time_start)
end
+ # rubocop:enable Metrics/AbcSize
# returns queue_message.body as hash,
# stores and parses get_content to body[:content]
def parse_body(queue_message)
message_body = JSON.parse(queue_message.body)
@@ -103,10 +107,19 @@
def parse_metadata(queue_message)
message_body = JSON.parse(queue_message.body)
{ timestamp: message_body['Timestamp'], topic_arn: message_body['TopicArn'] }
end
+ def parse_message_attributes(queue_message)
+ message_attributes = {}
+ queue_message.message_attributes&.each do |key, value|
+ message_attributes[key.to_sym] = coerce_message_attribute(value)
+ end
+
+ message_attributes
+ end
+
def get_metadata(message_body)
message_body.except('Message', 'Records')
end
def get_content(body)
@@ -121,22 +134,38 @@
def parse_json(message_contents)
parsed_body = JSON.parse(message_contents)
RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper
end
- def handle(message, metadata)
+ def handle(message, metadata, message_attributes)
if @message_handler
- @message_handler.new(message: message, metadata: metadata).handle
+ @message_handler.new(message: message, metadata: metadata, message_attributes: message_attributes).handle
elsif @block_message_handler
- @block_message_handler.call(message, metadata)
+ @block_message_handler.call(message, metadata, message_attributes)
else
raise NotImplementedError
end
end
private
+ def coerce_message_attribute(value)
+ case value['data_type']
+ when 'String'
+ value['string_value']
+ when 'Number'
+ JSON.parse(value['string_value'])
+ when 'String.Array'
+ JSON.parse(value['string_value'])
+ when 'Binary'
+ value['binary_value']
+ else
+ Pheme.logger.info("Unsupported custom data type")
+ value["binary_value"] || value["string_value"]
+ end
+ end
+
def with_optional_connection_pool_block
if connection_pool_block
ActiveRecord::Base.connection_pool.with_connection { yield }
else
yield
@@ -190,6 +219,7 @@
queue_url: queue_url,
body: body,
}.to_json)
end
end
+ # rubocop:enable Metrics/ClassLength
end