Sha256: 0e2e3f5d16cfde0776ca3abdbf7ab1a498989bc09bd7f9194790d0c083eb19d9
Contents?: true
Size: 1.17 KB
Versions: 2
Compression:
Stored size: 1.17 KB
Contents
# frozen_string_literal: true module Rdkafka # @private # A wrapper around a native kafka that polls and cleanly exits class NativeKafka def initialize(inner) @inner = inner # Start thread to poll client for delivery callbacks @polling_thread = Thread.new do loop do Rdkafka::Bindings.rd_kafka_poll(inner, 250) # Exit thread if closing and the poll queue is empty if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(inner) == 0 break end end end @polling_thread.abort_on_exception = true @polling_thread[:closing] = false end def inner @inner end def finalizer ->(_) { close } end def closed? @inner.nil? end def close(object_id=nil) return if closed? # Flush outstanding activity Rdkafka::Bindings.rd_kafka_flush(@inner, 30 * 1000) # Indicate to polling thread that we're closing @polling_thread[:closing] = true # Wait for the polling thread to finish up @polling_thread.join Rdkafka::Bindings.rd_kafka_destroy(@inner) @inner = nil end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
rdkafka-0.13.0.beta.2 | lib/rdkafka/native_kafka.rb |
rdkafka-0.13.0.beta.1 | lib/rdkafka/native_kafka.rb |