Sha256: 78d0b96aae8696963b2a8df57e7a222dc50797bff174ede4e34fd932875b0594

Contents?: true

Size: 1.23 KB

Versions: 3

Compression:

Stored size: 1.23 KB

Contents

module Karafka
  module Connection
    # Class that consumes messages for which we listen
    class Consumer
      # Consumes a message (does something with it)
      # It will execute a scheduling task from a proper controller based on a message topic
      # @note This should be looped to obtain a constant listening
      # @note We catch all the errors here, to make sure that none failures
      #   for a given consumption will affect other consumed messages
      #   If we would't catch it, it would propagate up until killing the Celluloid actor
      # @param message [Kafka::FetchedMessage] message that was fetched by kafka
      def consume(message)
        controller = Karafka::Routing::Router.new(message.topic).build
        # We wrap it around with our internal message format, so we don't pass around
        # a raw Kafka message
        controller.params = Message.new(message.topic, message.value)

        Karafka.monitor.notice(self.class, controller.to_h)

        controller.schedule
        # This is on purpose - see the notes for this method
        # rubocop:disable RescueException
      rescue Exception => e
        # rubocop:enable RescueException
        Karafka.monitor.notice_error(self.class, e)
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
karafka-0.5.0.2 lib/karafka/connection/consumer.rb
karafka-0.5.0.1 lib/karafka/connection/consumer.rb
karafka-0.5.0 lib/karafka/connection/consumer.rb