lib/phobos/actions/process_batch.rb in phobos-1.5.0 vs lib/phobos/actions/process_batch.rb in phobos-1.6.0

- old
+ new

@@ -1,60 +1,35 @@ module Phobos module Actions class ProcessBatch include Phobos::Instrumentation + attr_reader :metadata + def initialize(listener:, batch:, listener_metadata:) @listener = listener @batch = batch @listener_metadata = listener_metadata + @metadata = listener_metadata.merge( + batch_size: batch.messages.count, + partition: batch.partition, + offset_lag: batch.offset_lag + ) end def execute - @batch.messages.each do |message| - backoff = @listener.create_exponential_backoff - metadata = @listener_metadata.merge( - key: message.key, - partition: message.partition, - offset: message.offset, - retry_count: 0 - ) - - begin - instrument('listener.process_message', metadata) do |metadata| - time_elapsed = measure do - Phobos::Actions::ProcessMessage.new( - listener: @listener, - message: message, - metadata: metadata, - encoding: @listener.encoding - ).execute - end - metadata.merge!(time_elapsed: time_elapsed) + instrument('listener.process_batch', @metadata) do |metadata| + time_elapsed = measure do + @batch.messages.each do |message| + Phobos::Actions::ProcessMessage.new( + listener: @listener, + message: message, + listener_metadata: @listener_metadata + ).execute end - rescue => e - retry_count = metadata[:retry_count] - interval = backoff.interval_at(retry_count).round(2) - - error = { - waiting_time: interval, - exception_class: e.class.name, - exception_message: e.message, - backtrace: e.backtrace - } - - instrument('listener.retry_handler_error', error.merge(metadata)) do - Phobos.logger.error do - { message: "error processing message, waiting #{interval}s" }.merge(error).merge(metadata) - end - - sleep interval - metadata.merge!(retry_count: retry_count + 1) - end - - raise Phobos::AbortError if @listener.should_stop? - retry end + + metadata.merge!(time_elapsed: time_elapsed) end end end end end