lib/karafka/base_worker.rb in karafka-sidekiq-backend-1.1.0 vs lib/karafka/base_worker.rb in karafka-sidekiq-backend-1.2.0.beta1
- old
+ new
@@ -3,25 +3,30 @@
module Karafka
# Worker wrapper for Sidekiq workers
class BaseWorker
include Sidekiq::Worker
- # Executes the logic that lies in #perform Karafka controller method
+ # Executes the logic that lies in #perform Karafka consumer method
# @param topic_id [String] Unique topic id that we will use to find a proper topic
# @param params_batch [Array] Array with messages batch
def perform(topic_id, params_batch)
- Karafka.monitor.notice(self.class, params_batch)
- controller(topic_id, params_batch).consume
+ consumer = consumer(topic_id, params_batch)
+
+ Karafka.monitor.instrument(
+ 'backends.sidekiq.base_worker.perform',
+ caller: self,
+ consumer: consumer
+ ) { consumer.consume }
end
private
- # @return [Karafka::Controller] descendant of Karafka::BaseController that matches the topic
- # with params_batch assigned already (controller is ready to use)
- def controller(topic_id, params_batch)
+ # @return [Karafka::Consumer] descendant of Karafka::BaseConsumer that matches the topic
+ # with params_batch assigned already (consumer is ready to use)
+ def consumer(topic_id, params_batch)
topic = Karafka::Routing::Router.find(topic_id)
- controller = topic.controller.new
- controller.params_batch = topic.interchanger.parse(params_batch)
- controller
+ consumer = topic.consumer.new
+ consumer.params_batch = topic.interchanger.parse(params_batch)
+ consumer
end
end
end