lib/karafka/persistence/controller.rb in karafka-1.0.1 vs lib/karafka/persistence/controller.rb in karafka-1.1.0.alpha1

- old
+ new

@@ -6,18 +6,33 @@ module Persistence # Module used to provide a persistent cache across batch requests for a given # topic and partition to store some additional details when the persistent mode # for a given topic is turned on class Controller - # Used to build (if block given) and/or fetch a current controller instance that will be used - # to process messages from a given topic and partition - # @return [Karafka::BaseController] base controller descendant - # @param topic [Karafka::Routing::Topic] topic instance for which we might cache - # @param partition [Integer] number of partition for which we want to cache - def self.fetch(topic, partition) - return yield unless topic.persistent - Thread.current[topic.id] ||= {} - Thread.current[topic.id][partition] ||= yield + # Thread.current scope under which we store controllers data + PERSISTENCE_SCOPE = :controllers + + class << self + # @return [Hash] current thread persistence scope hash with all the controllers + def all + Thread.current[PERSISTENCE_SCOPE] ||= {} + end + + # Used to build (if block given) and/or fetch a current controller instance that will be + # used to process messages from a given topic and partition + # @return [Karafka::BaseController] base controller descendant + # @param topic [Karafka::Routing::Topic] topic instance for which we might cache + # @param partition [Integer] number of partition for which we want to cache + def fetch(topic, partition) + all[topic.id] ||= {} + + # We always store a current instance + if topic.persistent + all[topic.id][partition] ||= yield + else + all[topic.id][partition] = yield + end + end end end end end