lib/pheme/queue_poller.rb in pheme-0.0.8 vs lib/pheme/queue_poller.rb in pheme-0.0.9
- old
+ new
@@ -5,14 +5,16 @@
def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {})
raise ArgumentError, "must specify non-nil queue_url" unless queue_url.present?
@queue_url = queue_url
@queue_poller = Aws::SQS::QueuePoller.new(queue_url)
@connection_pool_block = connection_pool_block
+ @messages_processed = 0
+ @messages_received = 0
@format = format
@max_messages = max_messages
@poller_configuration = {
- wait_time_seconds: 10, # amount of time a long polling receive call can wait for a mesage before receiving a empty response (which will trigger another polling request)
+ wait_time_seconds: 10, # amount of time a long polling receive call can wait for a message before receiving a empty response (which will trigger another polling request)
idle_timeout: 20, # disconnects poller after 20 seconds of idle time
skip_delete: true, # manually delete messages
}.merge(poller_configuration || {})
if max_messages
@@ -21,42 +23,53 @@
end
end
end
def poll
- Pheme.log(:info, "Long-polling for messages on #{queue_url}")
+ time_start = log_polling_start
with_optional_connection_pool_block do
- queue_poller.poll(poller_configuration) do |message|
- data = parse_message(message)
- begin
- handle(data)
- queue_poller.delete_message(message)
- rescue SignalException => e
- throw :stop_polling
- rescue => e
- Pheme.log(:error, "Exception: #{e.inspect}")
- Pheme.log(:error, e.backtrace.join("\n"))
- Pheme.rollbar(e, "#{self.class} failed to process message", data)
+ queue_poller.poll(poller_configuration) do |queue_message|
+ @messages_received += 1
+ Pheme.logger.tagged(queue_message.message_id) do
+ begin
+ content = parse_body(queue_message)
+ handle(content)
+ queue_poller.delete_message(queue_message)
+ log_delete(queue_message)
+ @messages_processed += 1
+ rescue SignalException
+ throw :stop_polling
+ rescue StandardError => e
+ Pheme.logger.error(e)
+ Pheme.rollbar(e, "#{self.class} failed to process message", content)
+ end
end
end
end
- Pheme.log(:info, "Finished long-polling after #{@poller_configuration[:idle_timeout]} seconds.")
+ log_polling_end(time_start)
end
- def parse_message(message)
- Pheme.log(:info, "Received JSON payload: #{message.body}")
- content = get_content(JSON.parse(message.body))
+ # returns queue_message.body as hash,
+ # stores and parses get_content to body[:content]
+ def parse_body(queue_message)
+ message_body = JSON.parse(queue_message.body)
+ raw_content = get_content(message_body)
+
case format
when :csv
- parse_csv(content)
+ parsed_content = parse_csv(raw_content)
when :json
- parse_json(content)
+ parsed_content = parse_json(raw_content)
else
method_name = "parse_#{format}".to_sym
- raise ArgumentError.new("Unknown format #{format}") unless self.respond_to?(method_name)
- self.__send__(method_name, content)
+ raise ArgumentError, "Unknown format #{format}" unless respond_to?(method_name)
+ parsed_content = __send__(method_name, raw_content)
end
+
+ body = format == :csv ? raw_content : parsed_content
+ log_message_received(queue_message, body)
+ parsed_content
end
def get_content(body)
body['Message']
end
@@ -66,23 +79,69 @@
parsed_body.map{ |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) }
end
def parse_json(message_contents)
parsed_body = JSON.parse(message_contents)
- RecursiveOpenStruct.new({wrapper: parsed_body}, recurse_over_arrays: true).wrapper
+ RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper
end
- def handle(message)
+ def handle(_message)
raise NotImplementedError
end
- private
+ private
- def with_optional_connection_pool_block(&blk)
+ def with_optional_connection_pool_block
if connection_pool_block
- ActiveRecord::Base.connection_pool.with_connection { blk.call }
+ ActiveRecord::Base.connection_pool.with_connection { yield }
else
- blk.call
+ yield
end
+ end
+
+ def log_polling_start
+ time_start = Time.now
+ Pheme.logger.info({
+ message: "Start long-polling #{queue_url}",
+ type: self.class.name,
+ queue_url: queue_url,
+ format: format,
+ max_messages: max_messages,
+ connection_pool_block: connection_pool_block,
+ poller_configuration: poller_configuration,
+ }.to_json)
+ time_start
+ end
+
+ def log_polling_end(time_start)
+ time_end = Time.now
+ elapsed = time_end - time_start
+ Pheme.logger.info({
+ message: "Finished long-polling #{queue_url}, duration: #{elapsed.round(2)} seconds.",
+ queue_url: queue_url,
+ format: format,
+ messages_received: @messages_received,
+ messages_processed: @messages_processed,
+ duration: elapsed.round(2),
+ start_time: time_start.utc.iso8601,
+ end_time: time_end.utc.iso8601,
+ }.to_json)
+ end
+
+ def log_delete(queue_message)
+ Pheme.logger.info({
+ message: "Deleted message #{queue_message.message_id}",
+ message_id: queue_message.message_id,
+ queue_url: queue_url,
+ }.to_json)
+ end
+
+ def log_message_received(queue_message, body)
+ Pheme.logger.info({
+ message: "Received message #{queue_message.message_id}",
+ message_id: queue_message.message_id,
+ queue_url: queue_url,
+ body: body,
+ }.to_json)
end
end
end