lib/pheme/queue_poller.rb in pheme-0.0.8 vs lib/pheme/queue_poller.rb in pheme-0.0.9

- old
+ new

@@ -5,14 +5,16 @@ def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}) raise ArgumentError, "must specify non-nil queue_url" unless queue_url.present? @queue_url = queue_url @queue_poller = Aws::SQS::QueuePoller.new(queue_url) @connection_pool_block = connection_pool_block + @messages_processed = 0 + @messages_received = 0 @format = format @max_messages = max_messages @poller_configuration = { - wait_time_seconds: 10, # amount of time a long polling receive call can wait for a mesage before receiving a empty response (which will trigger another polling request) + wait_time_seconds: 10, # amount of time a long polling receive call can wait for a message before receiving a empty response (which will trigger another polling request) idle_timeout: 20, # disconnects poller after 20 seconds of idle time skip_delete: true, # manually delete messages }.merge(poller_configuration || {}) if max_messages @@ -21,42 +23,53 @@ end end end def poll - Pheme.log(:info, "Long-polling for messages on #{queue_url}") + time_start = log_polling_start with_optional_connection_pool_block do - queue_poller.poll(poller_configuration) do |message| - data = parse_message(message) - begin - handle(data) - queue_poller.delete_message(message) - rescue SignalException => e - throw :stop_polling - rescue => e - Pheme.log(:error, "Exception: #{e.inspect}") - Pheme.log(:error, e.backtrace.join("\n")) - Pheme.rollbar(e, "#{self.class} failed to process message", data) + queue_poller.poll(poller_configuration) do |queue_message| + @messages_received += 1 + Pheme.logger.tagged(queue_message.message_id) do + begin + content = parse_body(queue_message) + handle(content) + queue_poller.delete_message(queue_message) + log_delete(queue_message) + @messages_processed += 1 + rescue SignalException + throw :stop_polling + rescue StandardError => e + Pheme.logger.error(e) + Pheme.rollbar(e, "#{self.class} failed to process message", content) + end end end end - Pheme.log(:info, "Finished long-polling after #{@poller_configuration[:idle_timeout]} seconds.") + log_polling_end(time_start) end - def parse_message(message) - Pheme.log(:info, "Received JSON payload: #{message.body}") - content = get_content(JSON.parse(message.body)) + # returns queue_message.body as hash, + # stores and parses get_content to body[:content] + def parse_body(queue_message) + message_body = JSON.parse(queue_message.body) + raw_content = get_content(message_body) + case format when :csv - parse_csv(content) + parsed_content = parse_csv(raw_content) when :json - parse_json(content) + parsed_content = parse_json(raw_content) else method_name = "parse_#{format}".to_sym - raise ArgumentError.new("Unknown format #{format}") unless self.respond_to?(method_name) - self.__send__(method_name, content) + raise ArgumentError, "Unknown format #{format}" unless respond_to?(method_name) + parsed_content = __send__(method_name, raw_content) end + + body = format == :csv ? raw_content : parsed_content + log_message_received(queue_message, body) + parsed_content end def get_content(body) body['Message'] end @@ -66,23 +79,69 @@ parsed_body.map{ |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) } end def parse_json(message_contents) parsed_body = JSON.parse(message_contents) - RecursiveOpenStruct.new({wrapper: parsed_body}, recurse_over_arrays: true).wrapper + RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper end - def handle(message) + def handle(_message) raise NotImplementedError end - private + private - def with_optional_connection_pool_block(&blk) + def with_optional_connection_pool_block if connection_pool_block - ActiveRecord::Base.connection_pool.with_connection { blk.call } + ActiveRecord::Base.connection_pool.with_connection { yield } else - blk.call + yield end + end + + def log_polling_start + time_start = Time.now + Pheme.logger.info({ + message: "Start long-polling #{queue_url}", + type: self.class.name, + queue_url: queue_url, + format: format, + max_messages: max_messages, + connection_pool_block: connection_pool_block, + poller_configuration: poller_configuration, + }.to_json) + time_start + end + + def log_polling_end(time_start) + time_end = Time.now + elapsed = time_end - time_start + Pheme.logger.info({ + message: "Finished long-polling #{queue_url}, duration: #{elapsed.round(2)} seconds.", + queue_url: queue_url, + format: format, + messages_received: @messages_received, + messages_processed: @messages_processed, + duration: elapsed.round(2), + start_time: time_start.utc.iso8601, + end_time: time_end.utc.iso8601, + }.to_json) + end + + def log_delete(queue_message) + Pheme.logger.info({ + message: "Deleted message #{queue_message.message_id}", + message_id: queue_message.message_id, + queue_url: queue_url, + }.to_json) + end + + def log_message_received(queue_message, body) + Pheme.logger.info({ + message: "Received message #{queue_message.message_id}", + message_id: queue_message.message_id, + queue_url: queue_url, + body: body, + }.to_json) end end end