Sha256: b648d84dcc43c0308ab55b554fb7ffa2eedeba24ce13b730c1202f4a1f96d2e7

Contents?: true

Size: 729 Bytes

Versions: 1

Compression:

Stored size: 729 Bytes

Contents

# This is required for `rdkafka` version >= 0.12.0
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
class Rdkafka::Producer::Client
  # return false if producer is forcefully closed, otherwise return true
  def close(timeout=nil)
    return unless @native

    # Indicate to polling thread that we're closing
    @polling_thread[:closing] = true
    # Wait for the polling thread to finish up
    thread = @polling_thread.join(timeout)

    Rdkafka::Bindings.rd_kafka_destroy(@native)

    @native = nil

    return !thread.nil?
  end
end

class Rdkafka::Producer
  def close(timeout = nil)
    ObjectSpace.undefine_finalizer(self)

    return @client.close(timeout)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-kafka-0.19.3 lib/fluent/plugin/rdkafka_patch/0_12_0.rb