lib/phobos/actions/process_message.rb in phobos-1.7.2 vs lib/phobos/actions/process_message.rb in phobos-1.8.0
- old
+ new
@@ -1,8 +1,10 @@
module Phobos
module Actions
class ProcessMessage
+ MAX_SLEEP_INTERVAL = 3
+
include Phobos::Instrumentation
attr_reader :metadata
def initialize(listener:, message:, listener_metadata:)
@@ -36,32 +38,55 @@
instrument('listener.retry_handler_error', error.merge(@metadata)) do
Phobos.logger.error do
{ message: "error processing message, waiting #{interval}s" }.merge(error).merge(@metadata)
end
- sleep interval
+ snooze(interval)
end
- raise Phobos::AbortError if @listener.should_stop?
-
@metadata.merge!(retry_count: retry_count + 1)
retry
end
end
+ def snooze(interval)
+ remaining_interval = interval
+
+ @listener.send_heartbeat_if_necessary
+
+ while remaining_interval.positive?
+ sleep [remaining_interval, MAX_SLEEP_INTERVAL].min
+ remaining_interval -= MAX_SLEEP_INTERVAL
+ @listener.send_heartbeat_if_necessary
+ end
+ end
+
private
def force_encoding(value)
@listener.encoding ? value&.force_encoding(@listener.encoding) : value
end
def process_message(payload)
instrument('listener.process_message', @metadata) do
handler = @listener.handler_class.new
- preprocessed_payload = handler.before_consume(payload)
+ preprocessed_payload = begin
+ handler.before_consume(payload, @metadata)
+ rescue ArgumentError => e
+ Phobos.deprecate("before_consume now expects metadata as second argument, please update your consumer."\
+ " This will not be backwards compatible in the future.")
+ handler.before_consume(payload)
+ end
+ consume_block = Proc.new { handler.consume(preprocessed_payload, @metadata) }
- @listener.handler_class.around_consume(preprocessed_payload, @metadata) do
- handler.consume(preprocessed_payload, @metadata)
+ if @listener.handler_class.respond_to?(:around_consume)
+ # around_consume class method implementation
+ Phobos.deprecate("around_consume has been moved to instance method, please update your consumer."\
+ " This will not be backwards compatible in the future.")
+ @listener.handler_class.around_consume(preprocessed_payload, @metadata, &consume_block)
+ else
+ # around_consume instance method implementation
+ handler.around_consume(preprocessed_payload, @metadata, &consume_block)
end
end
end
end
end