Sha256: 46a9fdc365d7fb667cc42485f0abe622c60687e455a108747865c870f121674d

Contents?: true

Size: 1.83 KB

Versions: 6

Compression:

Stored size: 1.83 KB

Contents

# frozen_string_literal: true

#
# This example assumes you want to create a threaded kafka generator which
# publish a stream of kafka messages without consuming them. It also shows
# what happens when you produce more messages than the producer can handle.
#
require 'bundler/setup'
require 'json'
require 'phobos'

TOPIC = 'test-partitions'

Phobos.configure('config/phobos.yml')

class MyProducer
  include Phobos::Producer
end

#
# Trapping signals to properly stop this generator
#
@stop = false
[:INT, :TERM, :QUIT].each do |signal|
  Signal.trap(signal) do
    puts 'Stopping'
    @stop = true
  end
end

Thread.new do
  begin
    total = 1

    loop do
      break if @stop

      key = SecureRandom.uuid
      payload = Time.now.utc.to_json

      begin
        # Producer will use phobos configuration to create a kafka client and
        # a producer and it will bind both to the current thread, so it's safe
        # to call class methods here
        #
        MyProducer
          .producer
          .async_publish(TOPIC, payload, key)

        puts "produced #{key}, total: #{total}"

      # Since this is very simplistic code, we are going to generate more messages than
      # the producer can write to Kafka. Eventually we'll get some buffer overflows
      #
      rescue Kafka::BufferOverflow => e
        puts '| waiting'
        sleep(1)
        retry
      end

      total += 1
    end
  ensure
    #
    # Before we stop we must shutdown the async producer to ensure that all messages
    # are delivered
    #
    MyProducer
      .producer
      .async_producer_shutdown

    #
    # Since no client was configured (we can do this with
    #   `MyProducer.producer.configure_kafka_client`)
    # we must get the auto generated one and close it properly
    #
    MyProducer
      .producer
      .kafka_client
      .close
  end
end.join

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
phobos-1.8.3.pre.beta2 examples/publishing_messages_without_consumer.rb
phobos-1.8.3.pre.beta1 examples/publishing_messages_without_consumer.rb
phobos-1.8.2 examples/publishing_messages_without_consumer.rb
phobos-1.8.2.pre.beta2 examples/publishing_messages_without_consumer.rb
phobos-1.8.2.pre.beta1 examples/publishing_messages_without_consumer.rb
phobos-1.8.1 examples/publishing_messages_without_consumer.rb