Sha256: 57b02e7cae546578fadc766f6c264235bd21491c736142a844050fd04200f964
Contents?: true
Size: 1.77 KB
Versions: 1
Compression:
Stored size: 1.77 KB
Contents
# # This example assumes you want to create a threaded kafka generator which # publish a stream of kafka messages without consuming them. It also shows # what happens when you produce more messages than the producer can handle. # require 'bundler/setup' require 'phobos' TOPIC = 'test-partitions' Phobos.configure('config/phobos.yml') class MyProducer include Phobos::Producer end # # Trapping signals to properly stop this generator # @stop = false %i( INT TERM QUIT ).each do |signal| Signal.trap(signal) do puts "Stopping" @stop = true end end Thread.new do begin total = 1 loop do break if @stop key = SecureRandom.uuid payload = Time.now.utc.to_json begin # Producer will use phobos configuration to create a kafka client and # a producer and it will bind both to the current thread, so it's safe # to call class methods here # MyProducer .producer .async_publish(TOPIC, payload, key) puts "produced #{key}, total: #{total}" # Since this is very simplistic code, we are going to generate more messages than # the producer can write to Kafka. Eventually we'll get some buffer overflows # rescue Kafka::BufferOverflow => e puts "| waiting" sleep(1) retry end total += 1 end ensure # # Before we stop we must shutdown the async producer to ensure that all messages # are delivered # MyProducer .producer .async_producer_shutdown # # Since no client was configured (we can do this with `MyProducer.producer.configure_kafka_client`) # we must get the auto generated one and close it properly # MyProducer .producer .kafka_client .close end end.join
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
phobos-1.1.0 | examples/publishing_messages_without_consumer.rb |