lib/karafka/base_worker.rb in karafka-sidekiq-backend-1.1.0 vs lib/karafka/base_worker.rb in karafka-sidekiq-backend-1.2.0.beta1

- old
+ new

@@ -3,25 +3,30 @@ module Karafka # Worker wrapper for Sidekiq workers class BaseWorker include Sidekiq::Worker - # Executes the logic that lies in #perform Karafka controller method + # Executes the logic that lies in #perform Karafka consumer method # @param topic_id [String] Unique topic id that we will use to find a proper topic # @param params_batch [Array] Array with messages batch def perform(topic_id, params_batch) - Karafka.monitor.notice(self.class, params_batch) - controller(topic_id, params_batch).consume + consumer = consumer(topic_id, params_batch) + + Karafka.monitor.instrument( + 'backends.sidekiq.base_worker.perform', + caller: self, + consumer: consumer + ) { consumer.consume } end private - # @return [Karafka::Controller] descendant of Karafka::BaseController that matches the topic - # with params_batch assigned already (controller is ready to use) - def controller(topic_id, params_batch) + # @return [Karafka::Consumer] descendant of Karafka::BaseConsumer that matches the topic + # with params_batch assigned already (consumer is ready to use) + def consumer(topic_id, params_batch) topic = Karafka::Routing::Router.find(topic_id) - controller = topic.controller.new - controller.params_batch = topic.interchanger.parse(params_batch) - controller + consumer = topic.consumer.new + consumer.params_batch = topic.interchanger.parse(params_batch) + consumer end end end