Sha256: 5bfa7add6928ec886b284bd9d429f7e33598640a3f279429c9847641df58203e
Contents?: true
Size: 1.07 KB
Versions: 13
Compression:
Stored size: 1.07 KB
Contents
module Rdkafka class Producer class Client def initialize(native) @native = native # Start thread to poll client for delivery callbacks @polling_thread = Thread.new do loop do Rdkafka::Bindings.rd_kafka_poll(native, 250) # Exit thread if closing and the poll queue is empty if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(native) == 0 break end end end @polling_thread.abort_on_exception = true @polling_thread[:closing] = false end def native @native end def finalizer ->(_) { close } end def closed? @native.nil? end def close(object_id=nil) return unless @native # Indicate to polling thread that we're closing @polling_thread[:closing] = true # Wait for the polling thread to finish up @polling_thread.join Rdkafka::Bindings.rd_kafka_destroy(@native) @native = nil end end end end
Version data entries
13 entries across 13 versions & 2 rubygems