lib/karafka/persistence/consumer.rb in karafka-1.1.2 vs lib/karafka/persistence/consumer.rb in karafka-1.2.0.beta1
- old
+ new
@@ -1,25 +1,38 @@
# frozen_string_literal: true
module Karafka
+ # Module used to provide a persistent cache layer for Karafka components that need to be
+ # shared inside of a same thread
module Persistence
- # Persistence layer to store current thread messages consumer for further use
+ # Module used to provide a persistent cache across batch requests for a given
+ # topic and partition to store some additional details when the persistent mode
+ # for a given topic is turned on
class Consumer
- # Thread.current key under which we store current thread messages consumer
- PERSISTENCE_SCOPE = :consumer
+ # Thread.current scope under which we store consumers data
+ PERSISTENCE_SCOPE = :consumers
- # @param consumer [Karafka::Connection::Consumer] messages consumer of
- # a current thread
- # @return [Karafka::Connection::Consumer] persisted messages consumer
- def self.write(consumer)
- Thread.current[PERSISTENCE_SCOPE] = consumer
- end
+ class << self
+ # @return [Hash] current thread persistence scope hash with all the consumers
+ def all
+ # @note This does not need to be threadsafe (Hash) as it is always executed in a
+ # current thread context
+ Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} }
+ end
- # @return [Karafka::Connection::Consumer] persisted messages consumer
- # @raise [Karafka::Errors::MissingConsumer] raised when no thread messages consumer
- # but we try to use it anyway
- def self.read
- Thread.current[PERSISTENCE_SCOPE] || raise(Errors::MissingConsumer)
+ # Used to build (if block given) and/or fetch a current consumer instance that will be
+ # used to process messages from a given topic and partition
+ # @return [Karafka::BaseConsumer] base consumer descendant
+ # @param topic [Karafka::Routing::Topic] topic instance for which we might cache
+ # @param partition [Integer] number of partition for which we want to cache
+ def fetch(topic, partition)
+ # We always store a current instance for callback reasons
+ if topic.persistent
+ all[topic][partition] ||= topic.consumer.new
+ else
+ all[topic][partition] = topic.consumer.new
+ end
+ end
end
end
end
end