Sha256: cf99cc9ab33a625afb19d2a1aead2c5c33e0a5f280fa3152f3ade97e3828ca03

Contents?: true

Size: 1.26 KB

Versions: 1

Compression:

Stored size: 1.26 KB

Contents

class Rdkafka::NativeKafka
  # return false if producer is forcefully closed, otherwise return true
  def close(timeout=nil, object_id=nil)
    return true if closed?

    synchronize do
      # Indicate to the outside world that we are closing
      @closing = true

      thread_status = :unknown
      if @polling_thread
        # Indicate to polling thread that we're closing
        @polling_thread[:closing] = true

        # Wait for the polling thread to finish up,
        # this can be aborted in practice if this
        # code runs from a finalizer.
        thread_status = @polling_thread.join(timeout)
      end

      # Destroy the client after locking both mutexes
      @poll_mutex.lock

      # This check prevents a race condition, where we would enter the close in two threads
      # and after unlocking the primary one that hold the lock but finished, ours would be unlocked
      # and would continue to run, trying to destroy inner twice
      if @inner
        Rdkafka::Bindings.rd_kafka_destroy(@inner)
        @inner = nil
        @opaque = nil
      end

      !thread_status.nil?
    end
  end
end

class Rdkafka::Producer
  def close(timeout = nil)
    return true if closed?
    ObjectSpace.undefine_finalizer(self)
    @native_kafka.close(timeout)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-kafka-0.19.3 lib/fluent/plugin/rdkafka_patch/0_14_0.rb