Sha256: 61786d0683896260f835f2687aebc8869959a7f6fbfa55af2d81bb2466ba70ba
Contents?: true
Size: 1.04 KB
Versions: 1
Compression:
Stored size: 1.04 KB
Contents
# frozen_string_literal: true module Karafka # Worker wrapper for Sidekiq workers class BaseWorker include Sidekiq::Worker # Executes the logic that lies in #perform Karafka controller method # @param topic_id [String] Unique topic id that we will use to find a proper topic # @param params_batch [Array] Array with messages batch def perform(topic_id, params_batch) Karafka.monitor.notice(self.class, params_batch) instance = controller(topic_id, params_batch) # This allows to support both Karafka 1.0 and 1.1 instance.respond_to?(:consume) ? instance.consume : instance.perform end private # @return [Karafka::Controller] descendant of Karafka::BaseController that matches the topic # with params_batch assigned already (controller is ready to use) def controller(topic_id, params_batch) topic = Karafka::Routing::Router.find(topic_id) controller = topic.controller.new controller.params_batch = topic.interchanger.parse(params_batch) controller end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
karafka-sidekiq-backend-1.1.0.alpha1 | lib/karafka/base_worker.rb |