Sha256: fbbd2fb25b21ae58f82c23e953f66bc24495347d3256d953a94689f20cac8ac0

Contents?: true

Size: 1.01 KB

Versions: 7

Compression:

Stored size: 1.01 KB

Contents

# Consumes lines from a Kafka partition and writes them to STDOUT.
#
# You need to define the environment variable KAFKA_BROKERS for this
# to work, e.g.
#
#     export KAFKA_BROKERS=localhost:9092
#

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

# We don't want log output to clutter the console. Replace `StringIO.new`
# with e.g. `$stderr` if you want to see what's happening under the hood.
logger = Logger.new(StringIO.new)

brokers = ENV.fetch("KAFKA_BROKERS")

# Make sure to create this topic in your Kafka cluster or configure the
# cluster to auto-create topics.
topic = "text"

kafka = Kafka.new(
  seed_brokers: brokers,
  client_id: "simple-consumer",
  socket_timeout: 20,
  logger: logger,
)

begin
  offset = :latest
  partition = 0

  loop do
    messages = kafka.fetch_messages(
      topic: topic,
      partition: partition,
      offset: offset
    )

    messages.each do |message|
      puts message.value
      offset = message.offset + 1
    end
  end
ensure
  kafka.close
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
ruby-kafka-0.3.10 examples/simple-consumer.rb
ruby-kafka-0.3.9 examples/simple-consumer.rb
ruby-kafka-0.3.8 examples/simple-consumer.rb
ruby-kafka-0.3.7 examples/simple-consumer.rb
ruby-kafka-0.3.6 examples/simple-consumer.rb
ruby-kafka-0.3.5 examples/simple-consumer.rb
ruby-kafka-0.3.4 examples/simple-consumer.rb