Sha256: f2ada239052b69de5440e60a966736da7f7e70aae7a93f30377fa8d5307df311

Contents?: true

Size: 1.42 KB

Versions: 1

Compression:

Stored size: 1.42 KB

Contents

module Karafka
  module Connection
    # Class that consumes messages for which we listen
    class Consumer
      # Consumes a message (does something with it)
      # It will execute a scheduling task from a proper controller based on a message topic
      # @note This should be looped to obtain a constant listening
      # @note We catch all the errors here, to make sure that none failures
      #   for a given consumption will affect other consumed messages
      #   If we would't catch it, it would propagate up until killing the Celluloid actor
      # @param message [Kafka::FetchedMessage] message that was fetched by kafka
      def consume(message)
        # We map from incoming topic name, as it might be namespaced, etc.
        # @see topic_mapper internal docs
        mapped_topic = Karafka::App.config.topic_mapper.incoming(message.topic)

        controller = Karafka::Routing::Router.new(mapped_topic).build
        # We wrap it around with our internal message format, so we don't pass around
        # a raw Kafka message
        controller.params = Message.new(mapped_topic, message.value)

        Karafka.monitor.notice(self.class, controller.to_h)

        controller.schedule
        # This is on purpose - see the notes for this method
        # rubocop:disable RescueException
      rescue Exception => e
        # rubocop:enable RescueException
        Karafka.monitor.notice_error(self.class, e)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
karafka-0.5.0.3 lib/karafka/connection/consumer.rb