Sha256: 2a6b02c6597fe2273a38e95d422ac30e23a8bb0fed68cca28558466ef4379570

Contents?: true

Size: 1.19 KB

Versions: 11

Compression:

Stored size: 1.19 KB

Contents

module Messaging
  module Adapters
    class Kafka
      class Producer
        attr_reader :producer
        attr_reader :pid
        attr_reader :kafka

        def initialize(kafka_adapter)
          @kafka = kafka_adapter
          connect

          at_exit { shutdown }
        end

        # Delivers a message to Kafka asynchronously in a background thread.
        # This method will return immediately.
        #
        # @param message
        def call(message)
          reconnect if forked?
          producer.produce(message.to_json, key: message.message_key, topic: message.topic)
        rescue ::Kafka::BufferOverflow => exception
          Trouble.notify(exception, pid: Process.pid, message: message.to_json)
        end

        def shutdown
          return unless producer

          producer.deliver_messages
          producer.shutdown
        end

        def connect
          @producer = create_producer
          @pid = Process.pid
        end
        alias reconnect connect

        private

        def forked?
          Process.pid != pid
        end

        def create_producer
          kafka.client.async_producer(Config.kafka.producer.to_hash)
        end
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
messaging-3.6.0 lib/messaging/adapters/kafka/producer.rb
messaging-3.5.7 lib/messaging/adapters/kafka/producer.rb
messaging-3.5.6 lib/messaging/adapters/kafka/producer.rb
messaging-3.5.5 lib/messaging/adapters/kafka/producer.rb
messaging-3.5.4 lib/messaging/adapters/kafka/producer.rb
messaging-3.5.3 lib/messaging/adapters/kafka/producer.rb
messaging-3.5.2 lib/messaging/adapters/kafka/producer.rb
messaging-3.5.1 lib/messaging/adapters/kafka/producer.rb
messaging-3.4.3 lib/messaging/adapters/kafka/producer.rb
messaging-3.4.2 lib/messaging/adapters/kafka/producer.rb
messaging-3.4.1 lib/messaging/adapters/kafka/producer.rb