Sha256: 61786d0683896260f835f2687aebc8869959a7f6fbfa55af2d81bb2466ba70ba

Contents?: true

Size: 1.04 KB

Versions: 1

Compression:

Stored size: 1.04 KB

Contents

# frozen_string_literal: true

module Karafka
  # Worker wrapper for Sidekiq workers
  class BaseWorker
    include Sidekiq::Worker

    # Executes the logic that lies in #perform Karafka controller method
    # @param topic_id [String] Unique topic id that we will use to find a proper topic
    # @param params_batch [Array] Array with messages batch
    def perform(topic_id, params_batch)
      Karafka.monitor.notice(self.class, params_batch)
      instance = controller(topic_id, params_batch)
      # This allows to support both Karafka 1.0 and 1.1
      instance.respond_to?(:consume) ? instance.consume : instance.perform
    end

    private

    # @return [Karafka::Controller] descendant of Karafka::BaseController that matches the topic
    #   with params_batch assigned already (controller is ready to use)
    def controller(topic_id, params_batch)
      topic = Karafka::Routing::Router.find(topic_id)
      controller = topic.controller.new
      controller.params_batch = topic.interchanger.parse(params_batch)
      controller
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
karafka-sidekiq-backend-1.1.0.alpha1 lib/karafka/base_worker.rb