lib/deimos/batch_consumer.rb in deimos-ruby-1.7.0.pre.beta1 vs lib/deimos/batch_consumer.rb in deimos-ruby-1.8.0.pre.beta1

- old
+ new

@@ -1,147 +1,7 @@ # frozen_string_literal: true -require 'deimos/base_consumer' -require 'phobos/batch_handler' - module Deimos - # Class to consume batches of messages in a topic - # Note: According to the docs, instances of your handler will be created - # for every incoming batch of messages. This class should be lightweight. - class BatchConsumer < BaseConsumer - include Phobos::BatchHandler - - # :nodoc: - def around_consume_batch(batch, metadata) - payloads = [] - benchmark = Benchmark.measure do - if self.class.config[:key_configured] - metadata[:keys] = batch.map do |message| - decode_key(message.key) - end - end - - payloads = batch.map do |message| - message.payload ? self.class.decoder.decode(message.payload) : nil - end - _received_batch(payloads, metadata) - _with_span do - yield payloads, metadata - end - end - _handle_success(benchmark.real, payloads, metadata) - rescue StandardError => e - _handle_error(e, payloads, metadata) - end - - # Consume a batch of incoming messages. - # @param _payloads [Array<Phobos::BatchMessage>] - # @param _metadata [Hash] - def consume_batch(_payloads, _metadata) - raise NotImplementedError - end - - protected - - def _received_batch(payloads, metadata) - Deimos.config.logger.info( - message: 'Got Kafka batch event', - message_ids: _payload_identifiers(payloads, metadata), - metadata: metadata.except(:keys) - ) - Deimos.config.logger.debug( - message: 'Kafka batch event payloads', - payloads: payloads - ) - Deimos.config.metrics&.increment( - 'handler', - tags: %W( - status:batch_received - topic:#{metadata[:topic]} - )) - Deimos.config.metrics&.increment( - 'handler', - by: metadata['batch_size'], - tags: %W( - status:received - topic:#{metadata[:topic]} - )) - if payloads.present? - payloads.each { |payload| _report_time_delayed(payload, metadata) } - end - end - - # @param exception [Throwable] - # @param payloads [Array<Hash>] - # @param metadata [Hash] - def _handle_error(exception, payloads, metadata) - Deimos.config.metrics&.increment( - 'handler', - tags: %W( - status:batch_error - topic:#{metadata[:topic]} - )) - Deimos.config.logger.warn( - message: 'Error consuming message batch', - handler: self.class.name, - metadata: metadata.except(:keys), - message_ids: _payload_identifiers(payloads, metadata), - error_message: exception.message, - error: exception.backtrace - ) - super - end - - # @param time_taken [Float] - # @param payloads [Array<Hash>] - # @param metadata [Hash] - def _handle_success(time_taken, payloads, metadata) - Deimos.config.metrics&.histogram('handler', time_taken, tags: %W( - time:consume_batch - topic:#{metadata[:topic]} - )) - Deimos.config.metrics&.increment( - 'handler', - tags: %W( - status:batch_success - topic:#{metadata[:topic]} - )) - Deimos.config.metrics&.increment( - 'handler', - by: metadata['batch_size'], - tags: %W( - status:success - topic:#{metadata[:topic]} - )) - Deimos.config.logger.info( - message: 'Finished processing Kafka batch event', - message_ids: _payload_identifiers(payloads, metadata), - time_elapsed: time_taken, - metadata: metadata.except(:keys) - ) - end - - # Get payload identifiers (key and message_id if present) for logging. - # @param payloads [Array<Hash>] - # @param metadata [Hash] - # @return [Hash] the identifiers. - def _payload_identifiers(payloads, metadata) - message_ids = payloads&.map do |payload| - if payload.is_a?(Hash) && payload.key?('message_id') - payload['message_id'] - end - end - - # Payloads may be nil if preprocessing failed - messages = payloads || metadata[:keys] || [] - - messages.zip(metadata[:keys] || [], message_ids || []).map do |_, k, m_id| - ids = {} - - ids[:key] = k if k.present? - ids[:message_id] = m_id if m_id.present? - - ids - end - end + # @deprecated Use Deimos::Consumer with `delivery: inline_batch` configured instead + class BatchConsumer < Consumer end end