lib/phobos/actions/process_batch.rb in phobos-1.5.0 vs lib/phobos/actions/process_batch.rb in phobos-1.6.0
- old
+ new
@@ -1,60 +1,35 @@
module Phobos
module Actions
class ProcessBatch
include Phobos::Instrumentation
+ attr_reader :metadata
+
def initialize(listener:, batch:, listener_metadata:)
@listener = listener
@batch = batch
@listener_metadata = listener_metadata
+ @metadata = listener_metadata.merge(
+ batch_size: batch.messages.count,
+ partition: batch.partition,
+ offset_lag: batch.offset_lag
+ )
end
def execute
- @batch.messages.each do |message|
- backoff = @listener.create_exponential_backoff
- metadata = @listener_metadata.merge(
- key: message.key,
- partition: message.partition,
- offset: message.offset,
- retry_count: 0
- )
-
- begin
- instrument('listener.process_message', metadata) do |metadata|
- time_elapsed = measure do
- Phobos::Actions::ProcessMessage.new(
- listener: @listener,
- message: message,
- metadata: metadata,
- encoding: @listener.encoding
- ).execute
- end
- metadata.merge!(time_elapsed: time_elapsed)
+ instrument('listener.process_batch', @metadata) do |metadata|
+ time_elapsed = measure do
+ @batch.messages.each do |message|
+ Phobos::Actions::ProcessMessage.new(
+ listener: @listener,
+ message: message,
+ listener_metadata: @listener_metadata
+ ).execute
end
- rescue => e
- retry_count = metadata[:retry_count]
- interval = backoff.interval_at(retry_count).round(2)
-
- error = {
- waiting_time: interval,
- exception_class: e.class.name,
- exception_message: e.message,
- backtrace: e.backtrace
- }
-
- instrument('listener.retry_handler_error', error.merge(metadata)) do
- Phobos.logger.error do
- { message: "error processing message, waiting #{interval}s" }.merge(error).merge(metadata)
- end
-
- sleep interval
- metadata.merge!(retry_count: retry_count + 1)
- end
-
- raise Phobos::AbortError if @listener.should_stop?
- retry
end
+
+ metadata.merge!(time_elapsed: time_elapsed)
end
end
end
end
end