Sha256: 322ad7cb2aac7bc088565efeeeb5cffd7130e4518b69c195a51d89fa095c9288

Contents?: true

Size: 403 Bytes

Versions: 2

Compression:

Stored size: 403 Bytes

Contents

require 'kafka'

class KafkaSession
  class Producer
    def initialize(name:, brokers:)
      @client = Kafka.new(
        seed_brokers: brokers,
        client_id:    name
      )
    end

    def publish(messages:, topic:)
      producer = @client.producer

      messages.each do |message|
        producer.produce(message, topic: topic)
      end

      producer.deliver_messages
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
kafka_session-0.4.0 lib/kafka_session/producer.rb
kafka_session-0.4.0.pre.alpha lib/kafka_session/producer.rb