lib/phobos/actions/process_message.rb in phobos-1.8.1 vs lib/phobos/actions/process_message.rb in phobos-1.8.2.pre.beta1
- old
+ new
@@ -1,14 +1,14 @@
# frozen_string_literal: true
+require 'phobos/processor'
+
module Phobos
module Actions
class ProcessMessage
- MAX_SLEEP_INTERVAL = 3
+ include Phobos::Processor
- include Phobos::Instrumentation
-
attr_reader :metadata
def initialize(listener:, message:, listener_metadata:)
@listener = listener
@message = message
@@ -24,33 +24,18 @@
payload = force_encoding(@message.value)
begin
process_message(payload)
rescue StandardError => e
- handle_error(e)
+ handle_error(e, 'listener.retry_handler_error',
+ "error processing message, waiting #{backoff_interval}s")
retry
end
end
- def snooze(interval)
- remaining_interval = interval
-
- @listener.send_heartbeat_if_necessary
-
- while remaining_interval.positive?
- sleep [remaining_interval, MAX_SLEEP_INTERVAL].min
- remaining_interval -= MAX_SLEEP_INTERVAL
- @listener.send_heartbeat_if_necessary
- end
- end
-
private
- def force_encoding(value)
- @listener.encoding ? value&.force_encoding(@listener.encoding) : value
- end
-
def process_message(payload)
instrument('listener.process_message', @metadata) do
handler = @listener.handler_class.new
preprocessed_payload = before_consume(handler, payload)
@@ -72,46 +57,9 @@
handler.before_consume(payload, @metadata)
rescue ArgumentError
Phobos.deprecate('before_consume now expects metadata as second argument, '\
'please update your consumer. This will not be backwards compatible in the future.')
handler.before_consume(payload)
- end
-
- def handle_error(error)
- error_hash = {
- waiting_time: backoff_interval,
- exception_class: error.class.name,
- exception_message: error.message,
- backtrace: error.backtrace
- }
-
- instrument('listener.retry_handler_error', error_hash.merge(@metadata)) do
- Phobos.logger.error do
- { message: "error processing message, waiting #{backoff_interval}s" }
- .merge(error_hash)
- .merge(@metadata)
- end
-
- snooze(backoff_interval)
- end
-
- increment_retry_count
- end
-
- def retry_count
- @metadata[:retry_count]
- end
-
- def increment_retry_count
- @metadata[:retry_count] = retry_count + 1
- end
-
- def backoff
- @backoff ||= @listener.create_exponential_backoff
- end
-
- def backoff_interval
- backoff.interval_at(retry_count).round(2)
end
end
end
end