lib/phobos/actions/process_message.rb in phobos-1.8.0 vs lib/phobos/actions/process_message.rb in phobos-1.8.1
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
module Phobos
module Actions
class ProcessMessage
MAX_SLEEP_INTERVAL = 3
@@ -17,35 +19,16 @@
retry_count: 0
)
end
def execute
- backoff = @listener.create_exponential_backoff
payload = force_encoding(@message.value)
begin
process_message(payload)
- rescue => e
- retry_count = @metadata[:retry_count]
- interval = backoff.interval_at(retry_count).round(2)
-
- error = {
- waiting_time: interval,
- exception_class: e.class.name,
- exception_message: e.message,
- backtrace: e.backtrace
- }
-
- instrument('listener.retry_handler_error', error.merge(@metadata)) do
- Phobos.logger.error do
- { message: "error processing message, waiting #{interval}s" }.merge(error).merge(@metadata)
- end
-
- snooze(interval)
- end
-
- @metadata.merge!(retry_count: retry_count + 1)
+ rescue StandardError => e
+ handle_error(e)
retry
end
end
def snooze(interval)
@@ -67,28 +50,68 @@
end
def process_message(payload)
instrument('listener.process_message', @metadata) do
handler = @listener.handler_class.new
- preprocessed_payload = begin
- handler.before_consume(payload, @metadata)
- rescue ArgumentError => e
- Phobos.deprecate("before_consume now expects metadata as second argument, please update your consumer."\
- " This will not be backwards compatible in the future.")
- handler.before_consume(payload)
- end
- consume_block = Proc.new { handler.consume(preprocessed_payload, @metadata) }
+ preprocessed_payload = before_consume(handler, payload)
+ consume_block = proc { handler.consume(preprocessed_payload, @metadata) }
+
if @listener.handler_class.respond_to?(:around_consume)
# around_consume class method implementation
- Phobos.deprecate("around_consume has been moved to instance method, please update your consumer."\
- " This will not be backwards compatible in the future.")
+ Phobos.deprecate('around_consume has been moved to instance method, '\
+ 'please update your consumer. This will not be backwards compatible in the future.')
@listener.handler_class.around_consume(preprocessed_payload, @metadata, &consume_block)
else
# around_consume instance method implementation
handler.around_consume(preprocessed_payload, @metadata, &consume_block)
end
end
+ end
+
+ def before_consume(handler, payload)
+ handler.before_consume(payload, @metadata)
+ rescue ArgumentError
+ Phobos.deprecate('before_consume now expects metadata as second argument, '\
+ 'please update your consumer. This will not be backwards compatible in the future.')
+ handler.before_consume(payload)
+ end
+
+ def handle_error(error)
+ error_hash = {
+ waiting_time: backoff_interval,
+ exception_class: error.class.name,
+ exception_message: error.message,
+ backtrace: error.backtrace
+ }
+
+ instrument('listener.retry_handler_error', error_hash.merge(@metadata)) do
+ Phobos.logger.error do
+ { message: "error processing message, waiting #{backoff_interval}s" }
+ .merge(error_hash)
+ .merge(@metadata)
+ end
+
+ snooze(backoff_interval)
+ end
+
+ increment_retry_count
+ end
+
+ def retry_count
+ @metadata[:retry_count]
+ end
+
+ def increment_retry_count
+ @metadata[:retry_count] = retry_count + 1
+ end
+
+ def backoff
+ @backoff ||= @listener.create_exponential_backoff
+ end
+
+ def backoff_interval
+ backoff.interval_at(retry_count).round(2)
end
end
end
end