Sha256: 6d80ee653e9e430f95f2df890bf75846082a47fb97553df9cec1454e7e70bc46

Contents?: true

Size: 1.85 KB

Versions: 18

Compression:

Stored size: 1.85 KB

Contents

# frozen_string_literal: true

#
# This example assumes you want to create a threaded kafka generator which
# publish a stream of kafka messages without consuming them. It also shows
# what happens when you produce more messages than the producer can handle.
#
require 'bundler/setup'
require 'json'
require 'phobos'

TOPIC = 'test-partitions'

Phobos.configure('config/phobos.yml')

class MyProducer
  include Phobos::Producer
end

#
# Trapping signals to properly stop this generator
#
@stop = false
[:INT, :TERM, :QUIT].each do |signal|
  Signal.trap(signal) do
    puts 'Stopping'
    @stop = true
  end
end

Thread.new do
  begin
    total = 1

    loop do
      break if @stop

      key = SecureRandom.uuid
      payload = Time.now.utc.to_json

      begin
        # Producer will use phobos configuration to create a kafka client and
        # a producer and it will bind both to the current thread, so it's safe
        # to call class methods here
        #
        MyProducer
          .producer
          .async_publish(topic: TOPIC, payload: payload, key: key)

        puts "produced #{key}, total: #{total}"

      # Since this is very simplistic code, we are going to generate more messages than
      # the producer can write to Kafka. Eventually we'll get some buffer overflows
      #
      rescue Kafka::BufferOverflow => e
        puts '| waiting'
        sleep(1)
        retry
      end

      total += 1
    end
  ensure
    #
    # Before we stop we must shutdown the async producer to ensure that all messages
    # are delivered
    #
    MyProducer
      .producer
      .async_producer_shutdown

    #
    # Since no client was configured (we can do this with
    #   `MyProducer.producer.configure_kafka_client`)
    # we must get the auto generated one and close it properly
    #
    MyProducer
      .producer
      .kafka_client
      .close
  end
end.join

Version data entries

18 entries across 18 versions & 2 rubygems

Version Path
phobos-2.1.6 examples/publishing_messages_without_consumer.rb
phobos-2.1.5 examples/publishing_messages_without_consumer.rb
phobos-2.1.4 examples/publishing_messages_without_consumer.rb
phobos-2.1.3 examples/publishing_messages_without_consumer.rb
phobos-2.1.2 examples/publishing_messages_without_consumer.rb
phobos-2.1.1 examples/publishing_messages_without_consumer.rb
phobos_temp_fork-0.0.4 examples/publishing_messages_without_consumer.rb
phobos_temp_fork-0.0.3 examples/publishing_messages_without_consumer.rb
phobos_temp_fork-0.0.2 examples/publishing_messages_without_consumer.rb
phobos_temp_fork-0.0.1 examples/publishing_messages_without_consumer.rb
phobos-2.1.0 examples/publishing_messages_without_consumer.rb
phobos-2.0.2 examples/publishing_messages_without_consumer.rb
phobos-2.0.1 examples/publishing_messages_without_consumer.rb
phobos-2.0.0.pre.beta1 examples/publishing_messages_without_consumer.rb
phobos-1.9.0 examples/publishing_messages_without_consumer.rb
phobos-1.9.0.pre.beta3 examples/publishing_messages_without_consumer.rb
phobos-1.9.0.pre.beta2 examples/publishing_messages_without_consumer.rb
phobos-1.9.0.pre.beta1 examples/publishing_messages_without_consumer.rb