lib/karafka/base_worker.rb in karafka-sidekiq-backend-1.2.0 vs lib/karafka/base_worker.rb in karafka-sidekiq-backend-1.3.0.rc1
- old
+ new
@@ -3,30 +3,64 @@
module Karafka
# Worker wrapper for Sidekiq workers
class BaseWorker
include Sidekiq::Worker
+ class << self
+ # Returns the base worker class for application.
+ #
+ # @return [Class] first worker that inherited from Karafka::BaseWorker. Karafka
+ # assumes that it is the base worker for an application.
+ # @raise [Karafka::Errors::BaseWorkerDescentantMissing] raised when application
+ # base worker was not defined.
+ def base_worker
+ @inherited || raise(Errors::BaseWorkerDescentantMissing)
+ end
+
+ # @param subclass [Class] subclass of the worker
+ # @return [Class] subclass of the worker that was selected
+ def inherited(subclass)
+ @inherited ||= subclass
+ end
+ end
+
# Executes the logic that lies in #perform Karafka consumer method
# @param topic_id [String] Unique topic id that we will use to find a proper topic
- # @param params_batch [Array] Array with messages batch
- def perform(topic_id, params_batch)
- consumer = consumer(topic_id, params_batch)
+ # @param params_batch [Array<Hash>] Array with messages batch
+ # @param metadata [Hash, nil] hash with all the metadata or nil if not present
+ def perform(topic_id, params_batch, metadata)
+ consumer = consumer(topic_id, params_batch, metadata)
Karafka.monitor.instrument(
'backends.sidekiq.base_worker.perform',
caller: self,
consumer: consumer
) { consumer.consume }
end
private
+ # @see `#perform` for exact params descriptions
+ # @param topic_id [String]
+ # @param params_batch [Array<Hash>]
+ # @param metadata [Hash, nil]
# @return [Karafka::Consumer] descendant of Karafka::BaseConsumer that matches the topic
# with params_batch assigned already (consumer is ready to use)
- def consumer(topic_id, params_batch)
+ def consumer(topic_id, params_batch, metadata)
topic = Karafka::Routing::Router.find(topic_id)
- consumer = topic.consumer.new
- consumer.params_batch = topic.interchanger.decode(params_batch)
+ consumer = topic.consumer.new(topic)
+ consumer.params_batch = Params::Builders::ParamsBatch.from_array(
+ topic.interchanger.decode(params_batch),
+ topic
+ )
+
+ if topic.batch_fetching
+ consumer.metadata = Params::Builders::Metadata.from_hash(
+ metadata,
+ topic
+ )
+ end
+
consumer
end
end
end