Sha256: 11287ac791ed6e8e9c1eb01ed230d46e030d67c2544f94753913ff199752cbda

Contents?: true

Size: 739 Bytes

Versions: 48

Compression:

Stored size: 739 Bytes

Contents

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
brokers = ENV.fetch("KAFKA_BROKERS", "localhost:9092").split(",")

# Make sure to create this topic in your Kafka cluster or configure the
# cluster to auto-create topics.
topic = "text"

kafka = Kafka.new(
  seed_brokers: brokers,
  client_id: "test",
  socket_timeout: 20,
  logger: logger,
)

consumer = kafka.consumer(group_id: "test")
consumer.subscribe(topic)

trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }

begin
  consumer.each_message do |message|
  end
rescue Kafka::ProcessingError => e
  warn "Got #{e.cause}"
  consumer.pause(e.topic, e.partition, timeout: 20)

  retry
end

Version data entries

48 entries across 48 versions & 3 rubygems

Version Path
ruby-kafka-1.5.0 examples/consumer-group.rb
ruby-kafka-aws-iam-1.4.5 examples/consumer-group.rb
ruby-kafka-aws-iam-1.4.4 examples/consumer-group.rb
ruby-kafka-aws-iam-1.4.3 examples/consumer-group.rb
ruby-kafka-aws-iam-1.4.2 examples/consumer-group.rb
ruby-kafka-aws-iam-1.4.1 examples/consumer-group.rb
ruby-kafka-1.4.0 examples/consumer-group.rb
ruby-kafka-temp-fork-0.0.2 examples/consumer-group.rb
ruby-kafka-temp-fork-0.0.1 examples/consumer-group.rb
ruby-kafka-1.3.0 examples/consumer-group.rb
ruby-kafka-1.2.0 examples/consumer-group.rb
ruby-kafka-1.1.0 examples/consumer-group.rb
ruby-kafka-1.1.0.beta1 examples/consumer-group.rb
ruby-kafka-1.0.0 examples/consumer-group.rb
ruby-kafka-0.7.10 examples/consumer-group.rb
ruby-kafka-0.7.9 examples/consumer-group.rb
ruby-kafka-0.7.8 examples/consumer-group.rb
ruby-kafka-0.7.7 examples/consumer-group.rb
ruby-kafka-0.7.6 examples/consumer-group.rb
ruby-kafka-0.7.6.beta2 examples/consumer-group.rb