Sha256: 36b5304265df95ce00d4d036cc43fb84d5e5aff715cd392720ac507bb20cc767

Contents?: true

Size: 905 Bytes

Versions: 2

Compression:

Stored size: 905 Bytes

Contents

# frozen_string_literal: true

module Karafka
  # Worker wrapper for Sidekiq workers
  class BaseWorker
    include Sidekiq::Worker

    # Executes the logic that lies in #perform Karafka controller method
    # @param topic_id [String] Unique topic id that we will use to find a proper topic
    # @param params_batch [Array] Array with messages batch
    def perform(topic_id, params_batch)
      Karafka.monitor.notice(self.class, params_batch)
      controller(topic_id, params_batch).call
    end

    private

    # @return [Karafka::Controller] descendant of Karafka::BaseController that matches the topic
    #   with params_batch assigned already (controller is ready to use)
    def controller(topic_id, params_batch)
      @controller ||= Karafka::Routing::Router.build(topic_id).tap do |ctrl|
        ctrl.params_batch = ctrl.topic.interchanger.parse(params_batch)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
karafka-0.6.0.rc2 lib/karafka/base_worker.rb
karafka-0.6.0.rc1 lib/karafka/base_worker.rb