lib/karafka/persistence/consumer.rb in karafka-1.1.2 vs lib/karafka/persistence/consumer.rb in karafka-1.2.0.beta1

- old
+ new

@@ -1,25 +1,38 @@ # frozen_string_literal: true module Karafka + # Module used to provide a persistent cache layer for Karafka components that need to be + # shared inside of a same thread module Persistence - # Persistence layer to store current thread messages consumer for further use + # Module used to provide a persistent cache across batch requests for a given + # topic and partition to store some additional details when the persistent mode + # for a given topic is turned on class Consumer - # Thread.current key under which we store current thread messages consumer - PERSISTENCE_SCOPE = :consumer + # Thread.current scope under which we store consumers data + PERSISTENCE_SCOPE = :consumers - # @param consumer [Karafka::Connection::Consumer] messages consumer of - # a current thread - # @return [Karafka::Connection::Consumer] persisted messages consumer - def self.write(consumer) - Thread.current[PERSISTENCE_SCOPE] = consumer - end + class << self + # @return [Hash] current thread persistence scope hash with all the consumers + def all + # @note This does not need to be threadsafe (Hash) as it is always executed in a + # current thread context + Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} } + end - # @return [Karafka::Connection::Consumer] persisted messages consumer - # @raise [Karafka::Errors::MissingConsumer] raised when no thread messages consumer - # but we try to use it anyway - def self.read - Thread.current[PERSISTENCE_SCOPE] || raise(Errors::MissingConsumer) + # Used to build (if block given) and/or fetch a current consumer instance that will be + # used to process messages from a given topic and partition + # @return [Karafka::BaseConsumer] base consumer descendant + # @param topic [Karafka::Routing::Topic] topic instance for which we might cache + # @param partition [Integer] number of partition for which we want to cache + def fetch(topic, partition) + # We always store a current instance for callback reasons + if topic.persistent + all[topic][partition] ||= topic.consumer.new + else + all[topic][partition] = topic.consumer.new + end + end end end end end