Sha256: 84e460274afc8639859a7fd16f709de13b8871da8dea6b435c188e2012b0757f

Contents?: true

Size: 1.57 KB

Versions: 1

Compression:

Stored size: 1.57 KB

Contents

class Rdkafka::NativeKafka
  # return false if producer is forcefully closed, otherwise return true
  def close(timeout=nil, object_id=nil)
    return true if closed?

    synchronize do
      # Indicate to the outside world that we are closing
      @closing = true

      thread_status = :unknown
      if @polling_thread
        # Indicate to polling thread that we're closing
        @polling_thread[:closing] = true

        # Wait for the polling thread to finish up,
        # this can be aborted in practice if this
        # code runs from a finalizer.
        thread_status = @polling_thread.join(timeout)
      end

      # Destroy the client after locking both mutexes
      @poll_mutex.lock

      # This check prevents a race condition, where we would enter the close in two threads
      # and after unlocking the primary one that hold the lock but finished, ours would be unlocked
      # and would continue to run, trying to destroy inner twice
      retun unless @inner

      Rdkafka::Bindings.rd_kafka_destroy(@inner)
      @inner = nil
      @opaque = nil

      !thread_status.nil?
    end
  end
end

class Rdkafka::Producer
  def close(timeout = nil)
    return true if closed?
    ObjectSpace.undefine_finalizer(self)

    @native_kafka.close(timeout) do
      # We need to remove the topics references objects before we destroy the producer,
      # otherwise they would leak out
      @topics_refs_map.each_value do |refs|
        refs.each_value do |ref|
          Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
        end
      end
    end

    @topics_refs_map.clear
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-kafka-0.19.3 lib/fluent/plugin/rdkafka_patch/0_16_0.rb