Sha256: ae74b8cdde1a664d31d5cedb9521389cef81de206b56b97ecb3f478a7de62b4a
Contents?: true
Size: 1.78 KB
Versions: 15
Compression:
Stored size: 1.78 KB
Contents
require 'kafka' require_relative 'kafka/consumer' require_relative 'kafka/producer' module Messaging module Adapters # Internal: Adapter for producing and consuming messages with Kafka class Kafka # Internal: Ruby-Kafka client for the current thread. # # We keep one per thread as the client itself is not meant to be shared. # See https://github.com/zendesk/ruby-kafka#thread-safety for more information. def client Thread.current[:messaging_kafka_client] ||= create_kafka_client end # The producer doesn't need to be per thread as it # uses a background thread to deliver messages. def dispatcher @dispatcher ||= Producer.new(self) end def create_consumer(name, **options) Consumer.new(name: name, kafka_adapter: self, **options) end # Internal: Setup a Ruby-Kafka consumer def create_kafka_consumer(group_id) client.consumer({ group_id: group_id }.merge(Config.kafka.consumer.to_hash)) end private # Internal: Setup a Ruby-Kafka client def create_kafka_client # We have a specific logger for Ruby-Kafka as it # is a bit more noisy than we would like. kafka_logger = Messaging.logger.dup.tap { |logger| logger.level = Config.kafka.log_level } ::Kafka.new(Config.kafka.client.to_hash.merge(client_id: Config.app_name, logger: kafka_logger)) end def self.register! return if Adapters.key? :kafka Adapters.register(:kafka, memoize: true) { Kafka.new } Adapters::Consumer.register(:kafka, memoize: true) { Adapters[:kafka] } Adapters::Dispatcher.register(:kafka, memoize: true) { Adapters[:kafka].dispatcher } end private_class_method :register! register! end end end
Version data entries
15 entries across 15 versions & 1 rubygems