lib/fluent/plugin/out_datacalculator.rb in fluent-plugin-datacalculator-0.0.1 vs lib/fluent/plugin/out_datacalculator.rb in fluent-plugin-datacalculator-0.0.2

- old
+ new

@@ -1,20 +1,21 @@ +# -*- coding: utf-8 -*- class Fluent::DataCalculatorOutput < Fluent::Output Fluent::Plugin.register_output('datacalculator', self) config_param :count_interval, :time, :default => nil config_param :unit, :string, :default => 'minute' config_param :aggregate, :string, :default => 'tag' config_param :tag, :string, :default => 'datacalculate' config_param :input_tag_remove_prefix, :string, :default => nil config_param :formulas, :string config_param :finalizer, :string, :default => nil - config_param :outcast_unmatched, :bool, :default => false attr_accessor :tick attr_accessor :counts attr_accessor :last_checked + attr_accessor :aggregate_keys attr_accessor :_formulas attr_accessor :_finalizer def configure(conf) super @@ -28,23 +29,51 @@ when 'day' then 86400 else raise RuntimeError, "@unit must be one of minute/hour/day" end end + + conf.elements.each do |element| + element.keys.each do |k| + element[k] + end + + case element.name + when 'unmatched' + @unmatched = element + end + end + # TODO: unmatchedの時に別のタグを付けて、ふってあげないと行けない気がする + # unmatchedの定義 + # 1. aggregate_keys を持たないレコードが入ってきた時 + # 2. fomulaで必要な要素がなかったレコードが入ってきた時 + # 3. fomulaで集計可能な数値でない場合(文字列や真偽値、正規表現、ハッシュ、配列など) + + @aggregate_keys = [] @aggregate = case @aggregate when 'tag' then :tag when 'all' then :all else - raise Fluent::ConfigError, "flowcounter aggregate allows tag/all" + if @aggregate.index('keys') == 0 + @aggregate_keys = @aggregate.split(/\s/, 2)[1] + unless @aggregate_keys + raise Fluent::ConfigError, "aggregate_keys require in keys" + end + @aggregate_keys = @aggregate_keys.split(/\s*,\s*/) + @aggregate = 'keys' + else + raise Fluent::ConfigError, "flowcounter aggregate allows tag/all" + end end def createFunc (cnt, str) str.strip! left, right = str.split(/\s*=\s*/, 2) - rights = right.scan(/[a-zA-Z][\w\d_\.\$]*/).uniq - + # Fluent moduleだけはOK + rights = right.scan(/[a-zA-Z][\w\d_\.\$\:\@]*/).uniq.select{|x| x.index('Fluent') != 0} + begin f = eval('lambda {|'+rights.join(',')+'| '+right + '}') rescue SyntaxError raise Fluent::ConfigError, "'" + str + "' is not valid" end @@ -55,25 +84,25 @@ def execFunc (tag, obj, argv, formula) if tag != nil tag = stripped_tag (tag) end _argv = [] - + argv.each {|arg| if tag != nil and tag != 'all' arg = tag + '_' + arg end _argv.push obj[arg] } formula.call(*_argv) end - @_formulas = [[0, 'unmatched', nil, nil]] + @_formulas = [] if conf.has_key?('formulas') fs = conf['formulas'].split(/\s*,\s*/) fs.each_with_index { |str,i | - @_formulas.push( createFunc(i + 1, str) ) + @_formulas.push( createFunc(i, str) ) } end if conf.has_key?('finalizer') @_finalizer = createFunc(0, conf['finalizer']) @@ -127,11 +156,11 @@ def countups(tag, counts) if @aggregate == :all tag = 'all' end - + @mutex.synchronize { @counts[tag] ||= [0] * @_formulas.length counts.each_with_index do |count, i| @counts[tag][i] += count end @@ -144,55 +173,73 @@ return tag[@removed_length..-1] if tag == @input_tag_remove_prefix tag end def generate_output(counts, step) - output = {} if @aggregate == :all - # index 0 is unmatched - sum = if @outcast_unmatched - counts['all'][1..-1].inject(:+) - else - counts['all'].inject(:+) - end + output = {} counts['all'].each_with_index do |count,i| name = @_formulas[i][1] output[name] = count end if @_finalizer output[@_finalizer[1]] = execFunc('all', output, @_finalizer[2], @_finalizer[3]) end - return output + return [output] end + if @aggregate == 'keys' + outputs = [] + + counts.keys.each do |pat| + output = {} + pat_val = pat.split('_').map{|x| x.to_i } + counts[pat].each_with_index do |count, i| + name = @_formulas[i][1] + output[name] = count + end + + @aggregate_keys.each_with_index do |key, i| + output[@aggregate_keys[i]] = pat_val[i] + end + + if @_finalizer + output[@_finalizer[1]] = execFunc('all', output, @_finalizer[2], @_finalizer[3]) + end + + outputs.push(output) + end + + return outputs + end + + output = {} counts.keys.each do |tag| t = stripped_tag(tag) - sum = if @outcast_unmatched - counts[tag][1..-1].inject(:+) - else - counts[tag].inject(:+) - end counts[tag].each_with_index do |count,i| name = @_formulas[i][1] output[t + '_' + name] = count end if @_finalizer output[t + '_' + @_finalizer[1]] = execFunc(tag, output, @_finalizer[2], @_finalizer[3]) end end - output + [output] end def flush(step) - flushed,@counts = @counts,count_initialized(@counts.keys.dup) + flushed, @counts = @counts,count_initialized(@counts.keys.dup) generate_output(flushed, step) end def flush_emit(step) - Fluent::Engine.emit(@tag, Fluent::Engine.now, flush(step)) + data = flush(step) + data.each do |dat| + Fluent::Engine.emit(@tag, Fluent::Engine.now, dat) + end end def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) @@ -218,26 +265,63 @@ end } return true end - def emit(tag, es, chain) - c = [0] * @_formulas.length + def emit (tag, es, chain) + + if @aggregate == 'keys' + emit_aggregate_keys(tag, es, chain) + else + emit_single_tag(tag, es, chain) + end + end + def emit_aggregate_keys (tag, es, chain) + cs = {} + es.each do |time, record| + matched = false + pat = @aggregate_keys.map{ |key| record[key] }.join('_') + cs[pat] = [0] * @_formulas.length unless cs.has_key?(pat) + + if @_formulas.length > 0 + @_formulas.each do | index, outkey, inkeys, formula| + next unless formula and checkArgs(record, inkeys) + + cs[pat][index] += execFunc('all', record, inkeys, formula) + matched = true + end + else + $log.warn index + end + cs[pat][0] += 1 unless matched + end + + cs.keys.each do |pat| + countups(pat, cs[pat]) + end + + chain.next + end + + def emit_single_tag (tag, es, chain) + c = [0] * @_formulas.length + es.each do |time,record| matched = false if @_formulas.length > 0 @_formulas.each do |index, outkey, inkeys, formula| next unless formula and checkArgs(record, inkeys) c[index] += execFunc(nil, record, inkeys, formula) - matched = true + matched = true end else $log.warn index end c[0] += 1 unless matched end + countups(tag, c) chain.next end end