Sha256: 94b38f14ba89fbbee7deda64c3e631314f9e65c1f24f342120025629cdc144ee

Contents?: true

Size: 1.55 KB

Versions: 1

Compression:

Stored size: 1.55 KB

Contents

require "bundler/setup"
require "phobos"

TOPIC = 'test-partitions'

Phobos.configure('config/phobos.yml')

class MyProducer
  include Phobos::Producer
end

#
# Trapping signals to properly stop this generator
#
@stop = false
%i( INT TERM QUIT ).each do |signal|
  Signal.trap(signal) do
    puts "Stopping"
    @stop = true
  end
end

Thread.new do
  begin
    total = 1

    loop do
      break if @stop
      key = SecureRandom.uuid
      payload = Time.now.utc.to_json

      begin
        # Producer will use phobos configuration to create a kafka client and
        # a producer and it will bind both to the current thread, so it's safe
        # to call class methods here
        #
        MyProducer
          .producer
          .async_publish(TOPIC, payload, key)

        puts "produced #{key}, total: #{total}"

      # Since this is a simplistic code we are going to generate more messages than
      # the producer can write to Kafka, so eventually we'll get some buffer overflows
      #
      rescue Kafka::BufferOverflow => e
        puts "| waiting"
        sleep(1)
        retry
      end

      total += 1
    end
  ensure
    #
    # Before we stop we must shutdown the async producer to ensure that all messages
    # are delivered
    #
    MyProducer
      .producer
      .async_producer_shutdown

    #
    # Since no client was configured (we can do this with `MyProducer.producer.configure_kafka_client`)
    # we must get the auto generated one and close it properly
    #
    MyProducer
      .producer
      .kafka_client
      .close
  end
end.join

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
phobos-1.0.0 examples/publishing_messages_without_consumer.rb.rb