Sha256: 28452afc918f4e3dfe538392666188f2980a59d0abc737f5a3692bfbeac69033

Contents?: true

Size: 889 Bytes

Versions: 5

Compression:

Stored size: 889 Bytes

Contents

# frozen_string_literal: true

module Karafka
  module Persistence
    # Persistence layer to store current thread messages consumer for further use
    class Consumer
      # Thread.current key under which we store current thread messages consumer
      PERSISTENCE_SCOPE = :consumer

      # @param consumer [Karafka::Connection::Consumer] messages consumer of
      #   a current thread
      # @return [Karafka::Connection::Consumer] persisted messages consumer
      def self.write(consumer)
        Thread.current[PERSISTENCE_SCOPE] = consumer
      end

      # @return [Karafka::Connection::Consumer] persisted messages consumer
      # @raise [Karafka::Errors::MissingConsumer] raised when no thread messages consumer
      #   but we try to use it anyway
      def self.read
        Thread.current[PERSISTENCE_SCOPE] || raise(Errors::MissingConsumer)
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
karafka-1.1.2 lib/karafka/persistence/consumer.rb
karafka-1.1.1 lib/karafka/persistence/consumer.rb
karafka-1.1.0 lib/karafka/persistence/consumer.rb
karafka-1.1.0.alpha2 lib/karafka/persistence/consumer.rb
karafka-1.1.0.alpha1 lib/karafka/persistence/consumer.rb