Sha256: dd0cda1dd75059a81e4757b886f60209f5ba765547987ff5a6f23e520cd06af8
Contents?: true
Size: 1.78 KB
Versions: 3
Compression:
Stored size: 1.78 KB
Contents
require 'kafka' require_relative 'kafka/consumer' require_relative 'kafka/producer' module Messaging module Adapters # Internal: Adapter for producing and consuming messages with Kafka class Kafka # Internal: Ruby-Kafka client for the current thread. # # We keep one per thread as the client itself is not meant to be shared. # See https://github.com/zendesk/ruby-kafka#thread-safety for more information. def client Thread.current[:messaging_kafka_client] ||= create_kafka_client end # The producer doesn't need to be per thread as it # uses a background thread to deliver messages. def dispatcher @dispatcher ||= Producer.new(self) end def create_consumer(name, **options) Consumer.new(name: name, kafka_adapter: self, **options) end # Internal: Setup a Ruby-Kafka consumer def create_kafka_consumer(group_id) client.consumer({ group_id: group_id }.merge(Config.kafka.consumer.to_h)) end private # Internal: Setup a Ruby-Kafka client def create_kafka_client # We have a specific logger for Ruby-Kafka as it # is a bit more noisy than we would like. kafka_logger = Messaging.logger.dup.tap { |logger| logger.level = Config.kafka.log_level } ::Kafka.new(Config.kafka.client.to_h.merge(client_id: Config.app_name, logger: kafka_logger)) end def self.register! return if Adapters.key? :kafka Adapters.register(:kafka, memoize: true) { Kafka.new } Adapters::Consumer.register(:kafka, memoize: true) { Adapters[:kafka] } Adapters::Dispatcher.register(:kafka, memoize: true) { Adapters[:kafka].dispatcher } end private_class_method :register! register! end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
messaging-3.8.0 | lib/messaging/adapters/kafka.rb |
messaging-3.7.3 | lib/messaging/adapters/kafka.rb |
messaging-3.7.2 | lib/messaging/adapters/kafka.rb |