lib/phobos/actions/process_message.rb in phobos-1.6.1 vs lib/phobos/actions/process_message.rb in phobos-1.7.0
- old
+ new
@@ -6,11 +6,10 @@
attr_reader :metadata
def initialize(listener:, message:, listener_metadata:)
@listener = listener
@message = message
- @listener_metadata = listener_metadata
@metadata = listener_metadata.merge(
key: message.key,
partition: message.partition,
offset: message.offset,
retry_count: 0
@@ -54,23 +53,16 @@
def force_encoding(value)
@listener.encoding ? value.force_encoding(@listener.encoding) : value
end
def process_message(payload)
- instrument('listener.process_message', @metadata) do |metadata|
- consume_result = nil
- time_elapsed = measure do
- handler = @listener.handler_class.new
- preprocessed_payload = handler.before_consume(payload)
+ instrument('listener.process_message', @metadata) do
+ handler = @listener.handler_class.new
+ preprocessed_payload = handler.before_consume(payload)
- @listener.handler_class.around_consume(preprocessed_payload, @metadata) do
- consume_result = handler.consume(preprocessed_payload, @metadata)
- end
+ @listener.handler_class.around_consume(preprocessed_payload, @metadata) do
+ handler.consume(preprocessed_payload, @metadata)
end
-
- metadata.merge!(time_elapsed: time_elapsed)
-
- consume_result
end
end
end
end
end