# coding: utf-8

class RubyKafkaOutput < Fluent::Output
  Fluent::Plugin.register_output('ruby_kafka', self)

  config_param :zookeepers, :string, :default => nil

  config_param :brokers, :string, :default => nil

  config_param :default_topic, :string, :default => nil

  config_param :max_queue_size, :integer, :default => 1000

  config_param :delivery_threshold, :integer, :default => 0

  config_param :delivery_interval, :integer, :default => 0

  config_param :retry_count, :integer, :default => 3

  config_param :output_include_tag, :bool, :default => false

  config_param :output_include_time, :bool, :default => false

  config_param :use_kafka_log, :bool, :default => false

  @seed_brokers = []

  def initialize
    super
    require "kafka"
    require "zookeeper"
    require "json"
  end

  def configure(conf)
    super
    @seed_brokers = []
    if @zookeepers
      z = Zookeeper.new(@zookeepers)
      z.get_children(:path => '/brokers/ids')[:children].each do |id|
        broker = Yajl.load(z.get(:path => "/brokers/ids/#{id}")[:data])
        @seed_brokers.push("#{broker['host']}:#{broker['port']}")
      end
      z.close
      log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}"
    end

    if @seed_brokers.empty? and @brokers
      @seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",")
      log.info "brokers has been set directly: #{@seed_brokers}"
    end

    raise Fluent::ConfigError, "Broker has not been set." if @seed_brokers.empty?

    kafka = Kafka.new(seed_brokers: @seed_brokers, logger: @use_kafka_log ? log : nil)
    @producer = kafka.async_producer(
      max_queue_size: @max_queue_size,
      delivery_threshold: @delivery_threshold,
      delivery_interval: @delivery_interval,
    )
  end

  def start
    super
  end

  def shutdown
    super
    @producer.shutdown if @producer
  end

  def emit(tag, es, chain)
    chain.next
    es.each do |time,record|
      record['time'] = time if @output_include_time
      record['tag'] = tag if @output_include_tag
      topic = record['topic'] || @default_topic || tag

      data = JSON.dump(record)
      retry_counter = 0
      begin
        retry_counter += 1
        @producer.produce(data, topic: topic)
      rescue Kafka::BufferOverflow
        if retry_counter <= @retry_count
          log.warn "Buffer overflow, backing off for 1s. #{retry_counter} time."
          sleep 1
          retry
        else
          raise
        end
      end
    end
  end

end