Sha256: b648d84dcc43c0308ab55b554fb7ffa2eedeba24ce13b730c1202f4a1f96d2e7
Contents?: true
Size: 729 Bytes
Versions: 1
Compression:
Stored size: 729 Bytes
Contents
# This is required for `rdkafka` version >= 0.12.0 # Overriding the close method in order to provide a time limit for when it should be forcibly closed class Rdkafka::Producer::Client # return false if producer is forcefully closed, otherwise return true def close(timeout=nil) return unless @native # Indicate to polling thread that we're closing @polling_thread[:closing] = true # Wait for the polling thread to finish up thread = @polling_thread.join(timeout) Rdkafka::Bindings.rd_kafka_destroy(@native) @native = nil return !thread.nil? end end class Rdkafka::Producer def close(timeout = nil) ObjectSpace.undefine_finalizer(self) return @client.close(timeout) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-kafka-0.19.3 | lib/fluent/plugin/rdkafka_patch/0_12_0.rb |