Sha256: 6b76c3d42ed224631d74585454e2e232198e6841e5068147720682f6ffd5e404
Contents?: true
Size: 1.17 KB
Versions: 16
Compression:
Stored size: 1.17 KB
Contents
module Messaging module Adapters class Kafka class Producer attr_reader :producer attr_reader :pid attr_reader :kafka def initialize(kafka_adapter) @kafka = kafka_adapter connect at_exit { shutdown } end # Delivers a message to Kafka asynchronously in a background thread. # This method will return immediately. # # @param message def call(message) reconnect if forked? producer.produce(message.to_json, key: message.message_key, topic: message.topic) rescue ::Kafka::BufferOverflow => e ExceptionHandler.call(e, message: message.to_json) end def shutdown return unless producer producer.deliver_messages producer.shutdown end def connect @producer = create_producer @pid = Process.pid end alias reconnect connect private def forked? Process.pid != pid end def create_producer kafka.client.async_producer(**Config.kafka.producer.to_h) end end end end end
Version data entries
16 entries across 16 versions & 1 rubygems