lib/deimos/consume/batch_consumption.rb in deimos-ruby-1.16.3 vs lib/deimos/consume/batch_consumption.rb in deimos-ruby-1.16.4
- old
+ new
@@ -7,11 +7,13 @@
# of messages to be handled at once
module BatchConsumption
extend ActiveSupport::Concern
include Phobos::BatchHandler
- # :nodoc:
+ # @param batch [Array<String>]
+ # @param metadata [Hash]
+ # @return [void]
def around_consume_batch(batch, metadata)
payloads = []
_with_span do
benchmark = Benchmark.measure do
if self.class.config[:key_configured]
@@ -34,16 +36,18 @@
end
# Consume a batch of incoming messages.
# @param _payloads [Array<Phobos::BatchMessage>]
# @param _metadata [Hash]
+ # @return [void]
def consume_batch(_payloads, _metadata)
raise NotImplementedError
end
protected
+ # @!visibility private
def _received_batch(payloads, metadata)
Deimos.config.logger.info(
message: 'Got Kafka batch event',
message_ids: _payload_identifiers(payloads, metadata),
metadata: metadata.except(:keys)
@@ -68,10 +72,11 @@
if payloads.present?
payloads.each { |payload| _report_time_delayed(payload, metadata) }
end
end
+ # @!visibility private
# @param exception [Throwable]
# @param payloads [Array<Hash>]
# @param metadata [Hash]
def _handle_batch_error(exception, payloads, metadata)
Deimos.config.metrics&.increment(
@@ -89,10 +94,11 @@
error: exception.backtrace
)
_error(exception, payloads, metadata)
end
+ # @!visibility private
# @param time_taken [Float]
# @param payloads [Array<Hash>]
# @param metadata [Hash]
def _handle_batch_success(time_taken, payloads, metadata)
Deimos.config.metrics&.histogram('handler',
@@ -120,9 +126,10 @@
time_elapsed: time_taken,
metadata: metadata.except(:keys)
)
end
+ # @!visibility private
# Get payload identifiers (key and message_id if present) for logging.
# @param payloads [Array<Hash>]
# @param metadata [Hash]
# @return [Array<Array>] the identifiers.
def _payload_identifiers(payloads, metadata)