lib/pheme/queue_poller.rb in pheme-3.4.0 vs lib/pheme/queue_poller.rb in pheme-4.0.0

- old
+ new

@@ -1,8 +1,9 @@ require_relative 'compression' module Pheme + # rubocop:disable Metrics/ClassLength class QueuePoller include Compression attr_accessor :queue_url, :queue_poller, :connection_pool_block, :format, :max_messages, :poller_configuration @@ -49,19 +50,21 @@ throw :stop_polling if stats.received_message_count >= max_messages end end end + # rubocop:disable Metrics/AbcSize def poll time_start = log_polling_start queue_poller.poll(poller_configuration) do |queue_message| @messages_received += 1 Pheme.logger.tagged(queue_message.message_id) do begin content = parse_body(queue_message) metadata = parse_metadata(queue_message) - with_optional_connection_pool_block { handle(content, metadata) } + message_attributes = parse_message_attributes(queue_message) + with_optional_connection_pool_block { handle(content, metadata, message_attributes) } queue_poller.delete_message(queue_message) log_delete(queue_message) @messages_processed += 1 rescue SignalException throw :stop_polling @@ -71,10 +74,11 @@ end end end log_polling_end(time_start) end + # rubocop:enable Metrics/AbcSize # returns queue_message.body as hash, # stores and parses get_content to body[:content] def parse_body(queue_message) message_body = JSON.parse(queue_message.body) @@ -103,10 +107,19 @@ def parse_metadata(queue_message) message_body = JSON.parse(queue_message.body) { timestamp: message_body['Timestamp'], topic_arn: message_body['TopicArn'] } end + def parse_message_attributes(queue_message) + message_attributes = {} + queue_message.message_attributes&.each do |key, value| + message_attributes[key.to_sym] = coerce_message_attribute(value) + end + + message_attributes + end + def get_metadata(message_body) message_body.except('Message', 'Records') end def get_content(body) @@ -121,22 +134,38 @@ def parse_json(message_contents) parsed_body = JSON.parse(message_contents) RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper end - def handle(message, metadata) + def handle(message, metadata, message_attributes) if @message_handler - @message_handler.new(message: message, metadata: metadata).handle + @message_handler.new(message: message, metadata: metadata, message_attributes: message_attributes).handle elsif @block_message_handler - @block_message_handler.call(message, metadata) + @block_message_handler.call(message, metadata, message_attributes) else raise NotImplementedError end end private + def coerce_message_attribute(value) + case value['data_type'] + when 'String' + value['string_value'] + when 'Number' + JSON.parse(value['string_value']) + when 'String.Array' + JSON.parse(value['string_value']) + when 'Binary' + value['binary_value'] + else + Pheme.logger.info("Unsupported custom data type") + value["binary_value"] || value["string_value"] + end + end + def with_optional_connection_pool_block if connection_pool_block ActiveRecord::Base.connection_pool.with_connection { yield } else yield @@ -190,6 +219,7 @@ queue_url: queue_url, body: body, }.to_json) end end + # rubocop:enable Metrics/ClassLength end