lib/deimos/consume/batch_consumption.rb in deimos-ruby-1.16.3 vs lib/deimos/consume/batch_consumption.rb in deimos-ruby-1.16.4

- old
+ new

@@ -7,11 +7,13 @@ # of messages to be handled at once module BatchConsumption extend ActiveSupport::Concern include Phobos::BatchHandler - # :nodoc: + # @param batch [Array<String>] + # @param metadata [Hash] + # @return [void] def around_consume_batch(batch, metadata) payloads = [] _with_span do benchmark = Benchmark.measure do if self.class.config[:key_configured] @@ -34,16 +36,18 @@ end # Consume a batch of incoming messages. # @param _payloads [Array<Phobos::BatchMessage>] # @param _metadata [Hash] + # @return [void] def consume_batch(_payloads, _metadata) raise NotImplementedError end protected + # @!visibility private def _received_batch(payloads, metadata) Deimos.config.logger.info( message: 'Got Kafka batch event', message_ids: _payload_identifiers(payloads, metadata), metadata: metadata.except(:keys) @@ -68,10 +72,11 @@ if payloads.present? payloads.each { |payload| _report_time_delayed(payload, metadata) } end end + # @!visibility private # @param exception [Throwable] # @param payloads [Array<Hash>] # @param metadata [Hash] def _handle_batch_error(exception, payloads, metadata) Deimos.config.metrics&.increment( @@ -89,10 +94,11 @@ error: exception.backtrace ) _error(exception, payloads, metadata) end + # @!visibility private # @param time_taken [Float] # @param payloads [Array<Hash>] # @param metadata [Hash] def _handle_batch_success(time_taken, payloads, metadata) Deimos.config.metrics&.histogram('handler', @@ -120,9 +126,10 @@ time_elapsed: time_taken, metadata: metadata.except(:keys) ) end + # @!visibility private # Get payload identifiers (key and message_id if present) for logging. # @param payloads [Array<Hash>] # @param metadata [Hash] # @return [Array<Array>] the identifiers. def _payload_identifiers(payloads, metadata)