Sha256: 50b2116bbb92df58705f6f1858c8c1baa1321e39ae981c39482211bcbf31b714

Contents?: true

Size: 1.32 KB

Versions: 51

Compression:

Stored size: 1.32 KB

Contents

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"
require "dotenv"

Dotenv.load

KAFKA_CLIENT_CERT = ENV.fetch("KAFKA_CLIENT_CERT")
KAFKA_CLIENT_CERT_KEY = ENV.fetch("KAFKA_CLIENT_CERT_KEY")
KAFKA_SERVER_CERT = ENV.fetch("KAFKA_SERVER_CERT")
KAFKA_URL = ENV.fetch("KAFKA_URL")
KAFKA_BROKERS = KAFKA_URL
KAFKA_TOPIC = "test-messages"

NUM_THREADS = 4

queue = Queue.new

threads = NUM_THREADS.times.map do |worker_id|
  Thread.new do
    logger = Logger.new($stderr)
    logger.level = Logger::INFO

    logger.formatter = proc {|severity, datetime, progname, msg|
      "[#{worker_id}] #{severity.ljust(5)} -- #{msg}\n"
    }

    kafka = Kafka.new(
      seed_brokers: KAFKA_BROKERS,
      logger: logger,
      connect_timeout: 30,
      socket_timeout: 30,
      ssl_client_cert: KAFKA_CLIENT_CERT,
      ssl_client_cert_key: KAFKA_CLIENT_CERT_KEY,
      ssl_ca_cert: KAFKA_SERVER_CERT,
    )

    consumer = kafka.consumer(group_id: "firehose")
    consumer.subscribe(KAFKA_TOPIC)

    i = 0
    consumer.each_message do |message|
      i += 1

      if i % 1000 == 0
        queue << i
        i = 0
      end

      sleep 0.01
    end
  end
end

threads.each {|t| t.abort_on_exception = true }

received_messages = 0

loop do
  received_messages += queue.pop
  puts "===> Received #{received_messages} messages"
end

Version data entries

51 entries across 51 versions & 1 rubygems

Version Path
ruby-kafka-0.6.0.beta4 examples/firehose-consumer.rb
ruby-kafka-0.6.0.beta3 examples/firehose-consumer.rb
ruby-kafka-0.6.0.beta2 examples/firehose-consumer.rb
ruby-kafka-0.6.0.beta1 examples/firehose-consumer.rb
ruby-kafka-0.5.5 examples/firehose-consumer.rb
ruby-kafka-0.5.4 examples/firehose-consumer.rb
ruby-kafka-0.5.4.beta1 examples/firehose-consumer.rb
ruby-kafka-0.5.3 examples/firehose-consumer.rb
ruby-kafka-0.5.2 examples/firehose-consumer.rb
ruby-kafka-0.5.2.beta3 examples/firehose-consumer.rb
ruby-kafka-0.5.2.beta2 examples/firehose-consumer.rb
ruby-kafka-0.5.2.beta1 examples/firehose-consumer.rb
ruby-kafka-0.5.1 examples/firehose-consumer.rb
ruby-kafka-0.5.1.beta2 examples/firehose-consumer.rb
ruby-kafka-0.5.1.beta1 examples/firehose-consumer.rb
ruby-kafka-0.4.4 examples/firehose-consumer.rb
ruby-kafka-0.5.0 examples/firehose-consumer.rb
ruby-kafka-0.5.0.beta6 examples/firehose-consumer.rb
ruby-kafka-0.5.0.beta5 examples/firehose-consumer.rb
ruby-kafka-0.5.0.beta4 examples/firehose-consumer.rb