Sha256: f90f2d4bf32883d1985599ea049c0dc4c8dcb0babbb6f10afc0ddb52dfcb5e7c
Contents?: true
Size: 1020 Bytes
Versions: 2
Compression:
Stored size: 1020 Bytes
Contents
# frozen_string_literal: true module Karafka # Worker wrapper for Sidekiq workers class BaseWorker include Sidekiq::Worker # Executes the logic that lies in #perform Karafka consumer method # @param topic_id [String] Unique topic id that we will use to find a proper topic # @param params_batch [Array] Array with messages batch def perform(topic_id, params_batch) consumer = consumer(topic_id, params_batch) Karafka.monitor.instrument( 'backends.sidekiq.base_worker.perform', caller: self, consumer: consumer ) { consumer.consume } end private # @return [Karafka::Consumer] descendant of Karafka::BaseConsumer that matches the topic # with params_batch assigned already (consumer is ready to use) def consumer(topic_id, params_batch) topic = Karafka::Routing::Router.find(topic_id) consumer = topic.consumer.new consumer.params_batch = topic.interchanger.parse(params_batch) consumer end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
karafka-sidekiq-backend-1.2.0.beta2 | lib/karafka/base_worker.rb |
karafka-sidekiq-backend-1.2.0.beta1 | lib/karafka/base_worker.rb |