Sha256: 2a6b02c6597fe2273a38e95d422ac30e23a8bb0fed68cca28558466ef4379570
Contents?: true
Size: 1.19 KB
Versions: 11
Compression:
Stored size: 1.19 KB
Contents
module Messaging module Adapters class Kafka class Producer attr_reader :producer attr_reader :pid attr_reader :kafka def initialize(kafka_adapter) @kafka = kafka_adapter connect at_exit { shutdown } end # Delivers a message to Kafka asynchronously in a background thread. # This method will return immediately. # # @param message def call(message) reconnect if forked? producer.produce(message.to_json, key: message.message_key, topic: message.topic) rescue ::Kafka::BufferOverflow => exception Trouble.notify(exception, pid: Process.pid, message: message.to_json) end def shutdown return unless producer producer.deliver_messages producer.shutdown end def connect @producer = create_producer @pid = Process.pid end alias reconnect connect private def forked? Process.pid != pid end def create_producer kafka.client.async_producer(Config.kafka.producer.to_hash) end end end end end
Version data entries
11 entries across 11 versions & 1 rubygems