Sha256: d981f5dbb30b8793b385147cfc620e0eb0c6c1f3410222517ec6a52b399ca178
Contents?: true
Size: 1.16 KB
Versions: 4
Compression:
Stored size: 1.16 KB
Contents
module Messaging module Adapters class Kafka class Producer attr_reader :producer attr_reader :pid attr_reader :kafka def initialize(kafka_adapter) @kafka = kafka_adapter connect at_exit { shutdown } end # Delivers a message to Kafka asynchronously in a background thread. # This method will return immediately. # # @param message def call(message) reconnect if forked? producer.produce(message.to_json, key: message.message_key, topic: message.topic) rescue ::Kafka::BufferOverflow => e ExceptionHandler.call(e, message: message.to_json) end def shutdown return unless producer producer.deliver_messages producer.shutdown end def connect @producer = create_producer @pid = Process.pid end alias reconnect connect private def forked? Process.pid != pid end def create_producer kafka.client.async_producer(Config.kafka.producer.to_h) end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems