Sha256: d981f5dbb30b8793b385147cfc620e0eb0c6c1f3410222517ec6a52b399ca178

Contents?: true

Size: 1.16 KB

Versions: 4

Compression:

Stored size: 1.16 KB

Contents

module Messaging
  module Adapters
    class Kafka
      class Producer
        attr_reader :producer
        attr_reader :pid
        attr_reader :kafka

        def initialize(kafka_adapter)
          @kafka = kafka_adapter
          connect

          at_exit { shutdown }
        end

        # Delivers a message to Kafka asynchronously in a background thread.
        # This method will return immediately.
        #
        # @param message
        def call(message)
          reconnect if forked?
          producer.produce(message.to_json, key: message.message_key, topic: message.topic)
        rescue ::Kafka::BufferOverflow => e
          ExceptionHandler.call(e, message: message.to_json)
        end

        def shutdown
          return unless producer

          producer.deliver_messages
          producer.shutdown
        end

        def connect
          @producer = create_producer
          @pid = Process.pid
        end
        alias reconnect connect

        private

        def forked?
          Process.pid != pid
        end

        def create_producer
          kafka.client.async_producer(Config.kafka.producer.to_h)
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
messaging-3.8.0 lib/messaging/adapters/kafka/producer.rb
messaging-3.7.3 lib/messaging/adapters/kafka/producer.rb
messaging-3.7.2 lib/messaging/adapters/kafka/producer.rb
messaging-3.7.1 lib/messaging/adapters/kafka/producer.rb