Sha256: c0cea723d7f9f40ffaa967bac762b6a13765059b7565adb22cb030261fc0dbed

Contents?: true

Size: 1.35 KB

Versions: 48

Compression:

Stored size: 1.35 KB

Contents

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"
require "dotenv"

Dotenv.load

KAFKA_CLIENT_CERT = ENV.fetch("KAFKA_CLIENT_CERT")
KAFKA_CLIENT_CERT_KEY = ENV.fetch("KAFKA_CLIENT_CERT_KEY")
KAFKA_SERVER_CERT = ENV.fetch("KAFKA_SERVER_CERT")
KAFKA_URL = ENV.fetch("KAFKA_URL")
KAFKA_BROKERS = KAFKA_URL
KAFKA_TOPIC = "test-messages"

NUM_THREADS = 4

queue = Queue.new

threads = NUM_THREADS.times.map do |worker_id|
  Thread.new do
    logger = Logger.new($stderr)
    logger.level = Logger::INFO

    logger.formatter = proc {|severity, datetime, progname, msg|
      "[#{worker_id}] #{severity.ljust(5)} -- #{msg}\n"
    }

    kafka = Kafka.new(
      seed_brokers: KAFKA_BROKERS,
      logger: logger,
      connect_timeout: 30,
      socket_timeout: 30,
      ssl_client_cert: KAFKA_CLIENT_CERT,
      ssl_client_cert_key: KAFKA_CLIENT_CERT_KEY,
      ssl_ca_cert: KAFKA_SERVER_CERT,
    )

    consumer = kafka.consumer(group_id: "firehose")
    consumer.subscribe(KAFKA_TOPIC)

    i = 0
    consumer.each_message do |message|
      i += 1

      if i % 1000 == 0
        queue << i
        i = 0
      end

      sleep 0.01
    end
  end
end

threads.each {|t| t.abort_on_exception = true }

received_messages = 0

loop do
  received_messages += queue.pop
  puts "===> Received #{received_messages} messages"
end

Version data entries

48 entries across 48 versions & 3 rubygems

Version Path
ruby-kafka-1.5.0 examples/firehose-consumer.rb
ruby-kafka-aws-iam-1.4.5 examples/firehose-consumer.rb
ruby-kafka-aws-iam-1.4.4 examples/firehose-consumer.rb
ruby-kafka-aws-iam-1.4.3 examples/firehose-consumer.rb
ruby-kafka-aws-iam-1.4.2 examples/firehose-consumer.rb
ruby-kafka-aws-iam-1.4.1 examples/firehose-consumer.rb
ruby-kafka-1.4.0 examples/firehose-consumer.rb
ruby-kafka-temp-fork-0.0.2 examples/firehose-consumer.rb
ruby-kafka-temp-fork-0.0.1 examples/firehose-consumer.rb
ruby-kafka-1.3.0 examples/firehose-consumer.rb
ruby-kafka-1.2.0 examples/firehose-consumer.rb
ruby-kafka-1.1.0 examples/firehose-consumer.rb
ruby-kafka-1.1.0.beta1 examples/firehose-consumer.rb
ruby-kafka-1.0.0 examples/firehose-consumer.rb
ruby-kafka-0.7.10 examples/firehose-consumer.rb
ruby-kafka-0.7.9 examples/firehose-consumer.rb
ruby-kafka-0.7.8 examples/firehose-consumer.rb
ruby-kafka-0.7.7 examples/firehose-consumer.rb
ruby-kafka-0.7.6 examples/firehose-consumer.rb
ruby-kafka-0.7.6.beta2 examples/firehose-consumer.rb