Sha256: befda24bf9183217ce6c0ae4e3d3106667027ef7e63534d63a7725210c971966

Contents?: true

Size: 1.2 KB

Versions: 3

Compression:

Stored size: 1.2 KB

Contents

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"
require "dotenv"

Dotenv.load

KAFKA_CLIENT_CERT = ENV.fetch("KAFKA_CLIENT_CERT")
KAFKA_CLIENT_CERT_KEY = ENV.fetch("KAFKA_CLIENT_CERT_KEY")
KAFKA_SERVER_CERT = ENV.fetch("KAFKA_SERVER_CERT")
KAFKA_URL = ENV.fetch("KAFKA_URL")
KAFKA_BROKERS = KAFKA_URL.gsub("kafka+ssl://", "").split(",")
KAFKA_TOPIC = "test-messages"

NUM_THREADS = 20

threads = NUM_THREADS.times.map do
  Thread.new do
    logger = Logger.new($stderr)
    logger.level = Logger::INFO

    kafka = Kafka.new(
      seed_brokers: KAFKA_BROKERS,
      logger: logger,
      ssl_client_cert: KAFKA_CLIENT_CERT,
      ssl_client_cert_key: KAFKA_CLIENT_CERT_KEY,
      ssl_ca_cert: KAFKA_SERVER_CERT,
    )

    producer = kafka.async_producer(
      delivery_interval: 1,
      max_queue_size: 5_000,
      max_buffer_size: 10_000,
    )

    begin
      loop do
        producer.produce(rand.to_s, key: rand.to_s, topic: KAFKA_TOPIC)
      end
    rescue Kafka::BufferOverflow
      logger.error "Buffer overflow, backing off for 1s"
      sleep 1
      retry
    ensure
      producer.shutdown
    end
  end
end

threads.each {|t| t.abort_on_exception = true }

threads.map(&:join)

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
ruby-kafka-0.3.3 examples/firehose-producer.rb
ruby-kafka-0.3.2 examples/firehose-producer.rb
ruby-kafka-0.3.1 examples/firehose-producer.rb