Sha256: 0e2e3f5d16cfde0776ca3abdbf7ab1a498989bc09bd7f9194790d0c083eb19d9

Contents?: true

Size: 1.17 KB

Versions: 2

Compression:

Stored size: 1.17 KB

Contents

# frozen_string_literal: true

module Rdkafka
  # @private
  # A wrapper around a native kafka that polls and cleanly exits
  class NativeKafka
    def initialize(inner)
      @inner = inner

      # Start thread to poll client for delivery callbacks
      @polling_thread = Thread.new do
        loop do
          Rdkafka::Bindings.rd_kafka_poll(inner, 250)
          # Exit thread if closing and the poll queue is empty
          if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(inner) == 0
            break
          end
        end
      end
      @polling_thread.abort_on_exception = true
      @polling_thread[:closing] = false
    end

    def inner
      @inner
    end

    def finalizer
      ->(_) { close }
    end

    def closed?
      @inner.nil?
    end

    def close(object_id=nil)
      return if closed?

      # Flush outstanding activity
      Rdkafka::Bindings.rd_kafka_flush(@inner, 30 * 1000)

      # Indicate to polling thread that we're closing
      @polling_thread[:closing] = true
      # Wait for the polling thread to finish up
      @polling_thread.join

      Rdkafka::Bindings.rd_kafka_destroy(@inner)

      @inner = nil
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rdkafka-0.13.0.beta.2 lib/rdkafka/native_kafka.rb
rdkafka-0.13.0.beta.1 lib/rdkafka/native_kafka.rb