Sha256: c753f62dfa47043e15b48b2fd7018a58c52ef5c9e2bab383c05ba947c384ec4c
Contents?: true
Size: 1.78 KB
Versions: 14
Compression:
Stored size: 1.78 KB
Contents
require 'kafka' require_relative 'kafka/consumer' require_relative 'kafka/producer' module Messaging module Adapters # Internal: Adapter for producing and consuming messages with Kafka class Kafka # Internal: Ruby-Kafka client for the current thread. # # We keep one per thread as the client itself is not meant to be shared. # See https://github.com/zendesk/ruby-kafka#thread-safety for more information. def client Thread.current[:messaging_kafka_client] ||= create_kafka_client end # The producer doesn't need to be per thread as it # uses a background thread to deliver messages. def dispatcher @dispatcher ||= Producer.new(self) end def create_consumer(name, **options) Consumer.new(name: name, kafka_adapter: self, **options) end # Internal: Setup a Ruby-Kafka consumer def create_kafka_consumer(group_id) client.consumer(**{ group_id: group_id }.merge(Config.kafka.consumer.to_h)) end private # Internal: Setup a Ruby-Kafka client def create_kafka_client # We have a specific logger for Ruby-Kafka as it # is a bit more noisy than we would like. kafka_logger = Messaging.logger.dup.tap { |logger| logger.level = Config.kafka.log_level } ::Kafka.new(**Config.kafka.client.to_h.merge(client_id: Config.app_name, logger: kafka_logger)) end def self.register! return if Adapters.key? :kafka Adapters.register(:kafka, memoize: true) { Kafka.new } Adapters::Consumer.register(:kafka, memoize: true) { Adapters[:kafka] } Adapters::Dispatcher.register(:kafka, memoize: true) { Adapters[:kafka].dispatcher } end private_class_method :register! register! end end end
Version data entries
14 entries across 14 versions & 1 rubygems