Sha256: 401becfcdcdf36804331eb3af0f3bfdaa77a5c813d3758990740edb58c82c9da

Contents?: true

Size: 1.6 KB

Versions: 20

Compression:

Stored size: 1.6 KB

Contents

# frozen_string_literal: true

require 'deimos/consume/batch_consumption'
require 'deimos/consume/message_consumption'

# Class to consume messages coming from a Kafka topic
# Note: According to the docs, instances of your handler will be created
# for every incoming message/batch. This class should be lightweight.
module Deimos
  # Basic consumer class. Inherit from this class and override either consume_message
  # or consume_batch, depending on the `:batch` config setting.
  class Consumer < Karafka::BaseConsumer
    include Consume::MessageConsumption
    include Consume::BatchConsumption
    include SharedConfig

    def consume
      if self.topic.each_message
        _consume_messages
      else
        _consume_batch
      end
    end

  private

    def _with_span
      @span = Deimos.config.tracer&.start(
        'deimos-consumer',
        resource: self.class.name.gsub('::', '-')
      )
      yield
    ensure
      Deimos.config.tracer&.finish(@span)
    end

    # Overrideable method to determine if a given error should be considered
    # "fatal" and always be reraised.
    # @param _error [Exception]
    # @param _messages [Array<Karafka::Message>]
    # @return [Boolean]
    def fatal_error?(_error, _messages)
      false
    end

    # @param exception [Exception]
    # @param messages [Array<Karafka::Message>]
    def _error(exception, messages)
      Deimos.config.tracer&.set_error(@span, exception)

      raise if self.topic.reraise_errors ||
               Deimos.config.consumers.fatal_error&.call(exception, messages) ||
               fatal_error?(exception, messages)
    end

  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
deimos-ruby-2.0.5 lib/deimos/consumer.rb
deimos-ruby-2.0.4 lib/deimos/consumer.rb
deimos-ruby-2.0.3 lib/deimos/consumer.rb
deimos-ruby-2.0.2 lib/deimos/consumer.rb
deimos-ruby-2.0.1 lib/deimos/consumer.rb
deimos-ruby-2.0.0 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.beta7 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.beta6 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.beta5 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.beta4 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.beta3 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.beta2 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.beta1 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.alpha7 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.alpha6 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.alpha5 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.alpha4 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.alpha3 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.alpha2 lib/deimos/consumer.rb
deimos-ruby-2.0.0.pre.alpha1 lib/deimos/consumer.rb