Sha256: daf9d3ff93d58b8e521652d41ea9dd9d4baf9df6adabfa1ca6b4f8d3de41937f

Contents?: true

Size: 1.79 KB

Versions: 13

Compression:

Stored size: 1.79 KB

Contents

#
# This example assumes you want to create a threaded kafka generator which
# publish a stream of kafka messages without consuming them. It also shows
# what happens when you produce more messages than the producer can handle.
#
require 'bundler/setup'
require 'json'
require 'phobos'

TOPIC = 'test-partitions'

Phobos.configure('config/phobos.yml')

class MyProducer
  include Phobos::Producer
end

#
# Trapping signals to properly stop this generator
#
@stop = false
%i( INT TERM QUIT ).each do |signal|
  Signal.trap(signal) do
    puts "Stopping"
    @stop = true
  end
end

Thread.new do
  begin
    total = 1

    loop do
      break if @stop
      key = SecureRandom.uuid
      payload = Time.now.utc.to_json

      begin
        # Producer will use phobos configuration to create a kafka client and
        # a producer and it will bind both to the current thread, so it's safe
        # to call class methods here
        #
        MyProducer
          .producer
          .async_publish(TOPIC, payload, key)

        puts "produced #{key}, total: #{total}"

      # Since this is very simplistic code, we are going to generate more messages than
      # the producer can write to Kafka. Eventually we'll get some buffer overflows
      #
      rescue Kafka::BufferOverflow => e
        puts "| waiting"
        sleep(1)
        retry
      end

      total += 1
    end
  ensure
    #
    # Before we stop we must shutdown the async producer to ensure that all messages
    # are delivered
    #
    MyProducer
      .producer
      .async_producer_shutdown

    #
    # Since no client was configured (we can do this with `MyProducer.producer.configure_kafka_client`)
    # we must get the auto generated one and close it properly
    #
    MyProducer
      .producer
      .kafka_client
      .close
  end
end.join

Version data entries

13 entries across 13 versions & 1 rubygems

Version Path
phobos-1.8.0 examples/publishing_messages_without_consumer.rb
phobos-1.7.2 examples/publishing_messages_without_consumer.rb
phobos-1.7.1 examples/publishing_messages_without_consumer.rb
phobos-1.7.0 examples/publishing_messages_without_consumer.rb
phobos-1.6.1 examples/publishing_messages_without_consumer.rb
phobos-1.6.0 examples/publishing_messages_without_consumer.rb
phobos-1.5.0 examples/publishing_messages_without_consumer.rb
phobos-1.4.2 examples/publishing_messages_without_consumer.rb
phobos-1.4.1 examples/publishing_messages_without_consumer.rb
phobos-1.4.0 examples/publishing_messages_without_consumer.rb
phobos-1.3.0 examples/publishing_messages_without_consumer.rb
phobos-1.2.1 examples/publishing_messages_without_consumer.rb
phobos-1.2.0 examples/publishing_messages_without_consumer.rb