require 'fluent/plugin/output'

class Fluent::Plugin::NumericMonitorOutput < Fluent::Plugin::Output
  Fluent::Plugin.register_output('numeric_monitor', self)

  helpers :event_emitter, :timer

  EMIT_STREAM_RECORDS = 100

  config_param :count_interval, :time, default: 60, desc: 'The interval time to monitor in seconds.'
  config_param :unit, :enum, list: [:minute, :hour, :day], default: nil
  config_param :tag, :string, default: 'monitor', desc: 'The output tag.'
  config_param :aggregate, :enum, list: [:tag, :all], default: :tag, desc: "Calculate input events per tags, or all events"

  config_param :output_per_tag, :bool, default: false,
               desc: 'Produce monitor result per input tags.'
  config_param :input_tag_remove_prefix, :string, default: nil,
               desc: 'The prefix string which will be removed from the input tag.'

  config_param :monitor_key, :string, desc: 'The key to monitor in the event record.'
  config_param :output_key_prefix, :string, default: nil,
               desc: 'The prefix string which will be added to the output key.'
  config_param :percentiles, :array, value_type: :integer, default: nil,
               desc: 'Activate the percentile monitoring. ' \
                     'Must be specified between 1 and 99 by integer separeted by , (comma).'

  config_param :samples_limit, :integer, default: 1000000,
               desc: 'The limit number of sampling.'

  config_param :tag_prefix, :string, default: nil,
               desc: 'The prefix string to be added to input tags. Use with "output_per_tag yes".',
               deprecated: 'Use @label routing instead.'

  attr_accessor :count, :last_checked

  def configure(conf)
    label_routing_specified = conf.has_key?('@label')

    super

    if @unit
      @count_interval = case @unit
                        when :minute then 60
                        when :hour then 3600
                        when :day then 86400
                        else
                          raise "unknown unit: #{@unit}"
                        end
    end

    if @input_tag_remove_prefix
      @removed_prefix_string = @input_tag_remove_prefix + '.'
      @removed_length = @removed_prefix_string.length
    end

    @key_prefix_string = ''
    if @output_key_prefix
      @key_prefix_string = @output_key_prefix + '_'
    end

    if @output_per_tag && (!label_routing_specified && !@tag_prefix)
      raise Fluent::ConfigError, "specify @label to route output events into other <label> sections."
    end
    if @output_per_tag && @tag_prefix
      @tag_prefix_string = @tag_prefix + '.'
    else
      @tag_prefix_string = nil
    end

    if system_config.workers > 1
      log.warn "Fluentd is now working with multi process workers, and numeric_monitor plugin will produce monitor results in each separated processes."
    end

    @count = count_initialized
    @mutex = Mutex.new
  end

  def multi_workers_ready?
    true
  end

  def start
    super

    @last_checked = Fluent::Engine.now
    timer_execute(:out_numeric_counter_watcher, @count_interval) do
      now = Fluent::Engine.now
      flush_emit
      @last_checked = now
    end
  end

  def count_initialized(keys=nil)
    # counts['tag'] = {:min => num, :max => num, :sum => num, :num => num [, :sample => [....]]}
    if @aggregate == :all
      if @percentiles
        {'all' => {min: nil, max: nil, sum: nil, num: 0, sample: []}}
      else
        {'all' => {min: nil, max: nil, sum: nil, num: 0}}
      end
    elsif keys
      values = if @percentiles
                 Array.new(keys.length) {|i| {min: nil, max: nil, sum: nil, num: 0, sample: []}}
               else
                 Array.new(keys.length) {|i| {min: nil, max: nil, sum: nil, num: 0}}
               end
      Hash[[keys, values].transpose]
    else
      {}
    end
  end

  def stripped_tag(tag)
    return tag unless @input_tag_remove_prefix
    return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
    return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
    tag
  end

  def generate_fields(count, key_prefix = '', output = {})
    output[key_prefix + 'num'] = count[:num] if count[:num]
    output[key_prefix + 'min'] = count[:min] if count[:min]
    output[key_prefix + 'max'] = count[:max] if count[:max]
    output[key_prefix + 'avg'] = (count[:sum] / (count[:num] * 1.0)) if count[:num] > 0
    output[key_prefix + 'sum'] = count[:sum] if count[:sum]
    if @percentiles
      sorted = count[:sample].sort
      @percentiles.each do |p|
        i = (count[:num] * p / 100).floor
        if i > 0
          i -= 1
        end
        output[key_prefix + "percentile_#{p}"] = sorted[i]
      end
    end
    output
  end

  def generate_output(count)
    if @aggregate == :all
      if @output_per_tag
        # tag_prefix_all: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... } }
        output = {'all' => generate_fields(count['all'], @key_prefix_string)}
      else
        # tag: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... }
        output = generate_fields(count['all'], @key_prefix_string)
      end
    else
      output = {}
      if @output_per_tag
        # tag_prefix_tag1: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... }
        # tag_prefix_tag2: { 'key_prefix_min' => -10, 'key_prefix_max' => 10, ... }
        count.keys.each do |tag|
          output[stripped_tag(tag)] = generate_fields(count[tag], @key_prefix_string)
        end
      else
        # tag: { 'key_prefix_tag1_min' => -10, 'key_prefix_tag1_max' => 10, ..., 'key_prefix_tag2_min' => -10, 'key_prefix_tag2_max' => 10, ... }
        count.keys.each do |tag|
          key_prefix = @key_prefix_string + stripped_tag(tag) + '_'
          generate_fields(count[tag], key_prefix, output)
        end
      end
    end
    output
  end

  def flush
    flushed,@count = @count,count_initialized(@count.keys.dup)
    generate_output(flushed)
  end

  def flush_emit
    if @output_per_tag
      time = Fluent::Engine.now
      flush.each do |tag, message|
        router.emit(@tag_prefix_string + tag, time, message)
      end
    else
      router.emit(@tag, Fluent::Engine.now, flush)
    end
  end

  def countups(tag, min, max, sum, num, sample)
    if @aggregate == :all
      tag = 'all'
    end

    @mutex.synchronize do
      c = (@count[tag] ||= {min: nil, max: nil, sum: nil, num: 0})

      if c[:min].nil? or c[:min] > min
        c[:min] = min
      end
      if c[:max].nil? or c[:max] < max
        c[:max] = max
      end
      c[:sum] = (c[:sum] || 0) + sum
      c[:num] += num

      if @percentiles
        c[:sample] ||= []
        if c[:sample].size + sample.size > @samples_limit
          (c[:sample].size + sample.size - @samples_limit).times do
            c[:sample].delete_at(rand(c[:sample].size))
          end
        end
        c[:sample] += sample
      end
    end
  end

  def process(tag, es)
    min = nil
    max = nil
    sum = 0
    num = 0
    sample = if @percentiles then [] else nil end

    es.each do |time,record|
      value = record[@monitor_key]
      next if value.nil?

      value = value.to_f
      if min.nil? or min > value
        min = value
      end
      if max.nil? or max < value
        max = value
      end
      sum += value
      num += 1

      if @percentiles
        sample.push(value)
      end
    end
    if @percentiles && sample.size > @samples_limit
      (sample.size - @samples_limit / 2).to_i.times do
        sample.delete_at(rand(sample.size))
      end
    end
    countups(tag, min, max, sum, num, sample)
  end
end