Sha256: f90f2d4bf32883d1985599ea049c0dc4c8dcb0babbb6f10afc0ddb52dfcb5e7c

Contents?: true

Size: 1020 Bytes

Versions: 2

Compression:

Stored size: 1020 Bytes

Contents

# frozen_string_literal: true

module Karafka
  # Worker wrapper for Sidekiq workers
  class BaseWorker
    include Sidekiq::Worker

    # Executes the logic that lies in #perform Karafka consumer method
    # @param topic_id [String] Unique topic id that we will use to find a proper topic
    # @param params_batch [Array] Array with messages batch
    def perform(topic_id, params_batch)
      consumer = consumer(topic_id, params_batch)

      Karafka.monitor.instrument(
        'backends.sidekiq.base_worker.perform',
        caller: self,
        consumer: consumer
      ) { consumer.consume }
    end

    private

    # @return [Karafka::Consumer] descendant of Karafka::BaseConsumer that matches the topic
    #   with params_batch assigned already (consumer is ready to use)
    def consumer(topic_id, params_batch)
      topic = Karafka::Routing::Router.find(topic_id)
      consumer = topic.consumer.new
      consumer.params_batch = topic.interchanger.parse(params_batch)
      consumer
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
karafka-sidekiq-backend-1.2.0.beta2 lib/karafka/base_worker.rb
karafka-sidekiq-backend-1.2.0.beta1 lib/karafka/base_worker.rb