Sha256: 57b02e7cae546578fadc766f6c264235bd21491c736142a844050fd04200f964

Contents?: true

Size: 1.77 KB

Versions: 1

Compression:

Stored size: 1.77 KB

Contents

#
# This example assumes you want to create a threaded kafka generator which
# publish a stream of kafka messages without consuming them. It also shows
# what happens when you produce more messages than the producer can handle.
#
require 'bundler/setup'
require 'phobos'

TOPIC = 'test-partitions'

Phobos.configure('config/phobos.yml')

class MyProducer
  include Phobos::Producer
end

#
# Trapping signals to properly stop this generator
#
@stop = false
%i( INT TERM QUIT ).each do |signal|
  Signal.trap(signal) do
    puts "Stopping"
    @stop = true
  end
end

Thread.new do
  begin
    total = 1

    loop do
      break if @stop
      key = SecureRandom.uuid
      payload = Time.now.utc.to_json

      begin
        # Producer will use phobos configuration to create a kafka client and
        # a producer and it will bind both to the current thread, so it's safe
        # to call class methods here
        #
        MyProducer
          .producer
          .async_publish(TOPIC, payload, key)

        puts "produced #{key}, total: #{total}"

      # Since this is very simplistic code, we are going to generate more messages than
      # the producer can write to Kafka. Eventually we'll get some buffer overflows
      #
      rescue Kafka::BufferOverflow => e
        puts "| waiting"
        sleep(1)
        retry
      end

      total += 1
    end
  ensure
    #
    # Before we stop we must shutdown the async producer to ensure that all messages
    # are delivered
    #
    MyProducer
      .producer
      .async_producer_shutdown

    #
    # Since no client was configured (we can do this with `MyProducer.producer.configure_kafka_client`)
    # we must get the auto generated one and close it properly
    #
    MyProducer
      .producer
      .kafka_client
      .close
  end
end.join

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
phobos-1.1.0 examples/publishing_messages_without_consumer.rb