Sha256: 5bfa7add6928ec886b284bd9d429f7e33598640a3f279429c9847641df58203e

Contents?: true

Size: 1.07 KB

Versions: 13

Compression:

Stored size: 1.07 KB

Contents

module Rdkafka
  class Producer
    class Client
      def initialize(native)
        @native = native

        # Start thread to poll client for delivery callbacks
        @polling_thread = Thread.new do
          loop do
            Rdkafka::Bindings.rd_kafka_poll(native, 250)
            # Exit thread if closing and the poll queue is empty
            if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(native) == 0
              break
            end
          end
        end
        @polling_thread.abort_on_exception = true
        @polling_thread[:closing] = false
      end

      def native
        @native
      end

      def finalizer
        ->(_) { close }
      end

      def closed?
        @native.nil?
      end

      def close(object_id=nil)
        return unless @native

        # Indicate to polling thread that we're closing
        @polling_thread[:closing] = true
        # Wait for the polling thread to finish up
        @polling_thread.join

        Rdkafka::Bindings.rd_kafka_destroy(@native)

        @native = nil
      end
    end
  end
end

Version data entries

13 entries across 13 versions & 2 rubygems

Version Path
rdkafka-0.12.1 lib/rdkafka/producer/client.rb
karafka-rdkafka-0.12.4 lib/rdkafka/producer/client.rb
karafka-rdkafka-0.12.3 lib/rdkafka/producer/client.rb
karafka-rdkafka-0.12.2 lib/rdkafka/producer/client.rb
karafka-rdkafka-0.12.1 lib/rdkafka/producer/client.rb
karafka-rdkafka-0.12.1.beta1 lib/rdkafka/producer/client.rb
karafka-rdkafka-0.12.0 lib/rdkafka/producer/client.rb
rdkafka-0.12.0 lib/rdkafka/producer/client.rb
rdkafka-0.12.0.beta.4 lib/rdkafka/producer/client.rb
rdkafka-0.12.0.beta.3 lib/rdkafka/producer/client.rb
rdkafka-0.12.0.beta.2 lib/rdkafka/producer/client.rb
rdkafka-0.12.0.beta.1 lib/rdkafka/producer/client.rb
rdkafka-0.12.0.beta.0 lib/rdkafka/producer/client.rb