Sha256: 6b76c3d42ed224631d74585454e2e232198e6841e5068147720682f6ffd5e404

Contents?: true

Size: 1.17 KB

Versions: 16

Compression:

Stored size: 1.17 KB

Contents

module Messaging
  module Adapters
    class Kafka
      class Producer
        attr_reader :producer
        attr_reader :pid
        attr_reader :kafka

        def initialize(kafka_adapter)
          @kafka = kafka_adapter
          connect

          at_exit { shutdown }
        end

        # Delivers a message to Kafka asynchronously in a background thread.
        # This method will return immediately.
        #
        # @param message
        def call(message)
          reconnect if forked?
          producer.produce(message.to_json, key: message.message_key, topic: message.topic)
        rescue ::Kafka::BufferOverflow => e
          ExceptionHandler.call(e, message: message.to_json)
        end

        def shutdown
          return unless producer

          producer.deliver_messages
          producer.shutdown
        end

        def connect
          @producer = create_producer
          @pid = Process.pid
        end
        alias reconnect connect

        private

        def forked?
          Process.pid != pid
        end

        def create_producer
          kafka.client.async_producer(**Config.kafka.producer.to_h)
        end
      end
    end
  end
end

Version data entries

16 entries across 16 versions & 1 rubygems

Version Path
messaging-4.0.12 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.11 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.10 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.10.pre lib/messaging/adapters/kafka/producer.rb
messaging-4.0.9 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.8 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.7 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.6 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.5 lib/messaging/adapters/kafka/producer.rb
messaging-4.0.4.pre lib/messaging/adapters/kafka/producer.rb
messaging-4.0.3.pre lib/messaging/adapters/kafka/producer.rb
messaging-4.0.2.pre lib/messaging/adapters/kafka/producer.rb
messaging-4.0.1.pre lib/messaging/adapters/kafka/producer.rb
messaging-4.0.0.pre lib/messaging/adapters/kafka/producer.rb
messaging-3.8.2 lib/messaging/adapters/kafka/producer.rb
messaging-3.8.1 lib/messaging/adapters/kafka/producer.rb