Sha256: 78d0b96aae8696963b2a8df57e7a222dc50797bff174ede4e34fd932875b0594
Contents?: true
Size: 1.23 KB
Versions: 3
Compression:
Stored size: 1.23 KB
Contents
module Karafka module Connection # Class that consumes messages for which we listen class Consumer # Consumes a message (does something with it) # It will execute a scheduling task from a proper controller based on a message topic # @note This should be looped to obtain a constant listening # @note We catch all the errors here, to make sure that none failures # for a given consumption will affect other consumed messages # If we would't catch it, it would propagate up until killing the Celluloid actor # @param message [Kafka::FetchedMessage] message that was fetched by kafka def consume(message) controller = Karafka::Routing::Router.new(message.topic).build # We wrap it around with our internal message format, so we don't pass around # a raw Kafka message controller.params = Message.new(message.topic, message.value) Karafka.monitor.notice(self.class, controller.to_h) controller.schedule # This is on purpose - see the notes for this method # rubocop:disable RescueException rescue Exception => e # rubocop:enable RescueException Karafka.monitor.notice_error(self.class, e) end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
karafka-0.5.0.2 | lib/karafka/connection/consumer.rb |
karafka-0.5.0.1 | lib/karafka/connection/consumer.rb |
karafka-0.5.0 | lib/karafka/connection/consumer.rb |