lib/fluent/plugin/out_datacalculator.rb in fluent-plugin-datacalculator-0.0.1 vs lib/fluent/plugin/out_datacalculator.rb in fluent-plugin-datacalculator-0.0.2
- old
+ new
@@ -1,20 +1,21 @@
+# -*- coding: utf-8 -*-
class Fluent::DataCalculatorOutput < Fluent::Output
Fluent::Plugin.register_output('datacalculator', self)
config_param :count_interval, :time, :default => nil
config_param :unit, :string, :default => 'minute'
config_param :aggregate, :string, :default => 'tag'
config_param :tag, :string, :default => 'datacalculate'
config_param :input_tag_remove_prefix, :string, :default => nil
config_param :formulas, :string
config_param :finalizer, :string, :default => nil
- config_param :outcast_unmatched, :bool, :default => false
attr_accessor :tick
attr_accessor :counts
attr_accessor :last_checked
+ attr_accessor :aggregate_keys
attr_accessor :_formulas
attr_accessor :_finalizer
def configure(conf)
super
@@ -28,23 +29,51 @@
when 'day' then 86400
else
raise RuntimeError, "@unit must be one of minute/hour/day"
end
end
+
+ conf.elements.each do |element|
+ element.keys.each do |k|
+ element[k]
+ end
+
+ case element.name
+ when 'unmatched'
+ @unmatched = element
+ end
+ end
+ # TODO: unmatchedの時に別のタグを付けて、ふってあげないと行けない気がする
+ # unmatchedの定義
+ # 1. aggregate_keys を持たないレコードが入ってきた時
+ # 2. fomulaで必要な要素がなかったレコードが入ってきた時
+ # 3. fomulaで集計可能な数値でない場合(文字列や真偽値、正規表現、ハッシュ、配列など)
+
+ @aggregate_keys = []
@aggregate = case @aggregate
when 'tag' then :tag
when 'all' then :all
else
- raise Fluent::ConfigError, "flowcounter aggregate allows tag/all"
+ if @aggregate.index('keys') == 0
+ @aggregate_keys = @aggregate.split(/\s/, 2)[1]
+ unless @aggregate_keys
+ raise Fluent::ConfigError, "aggregate_keys require in keys"
+ end
+ @aggregate_keys = @aggregate_keys.split(/\s*,\s*/)
+ @aggregate = 'keys'
+ else
+ raise Fluent::ConfigError, "flowcounter aggregate allows tag/all"
+ end
end
def createFunc (cnt, str)
str.strip!
left, right = str.split(/\s*=\s*/, 2)
- rights = right.scan(/[a-zA-Z][\w\d_\.\$]*/).uniq
-
+ # Fluent moduleだけはOK
+ rights = right.scan(/[a-zA-Z][\w\d_\.\$\:\@]*/).uniq.select{|x| x.index('Fluent') != 0}
+
begin
f = eval('lambda {|'+rights.join(',')+'| '+right + '}')
rescue SyntaxError
raise Fluent::ConfigError, "'" + str + "' is not valid"
end
@@ -55,25 +84,25 @@
def execFunc (tag, obj, argv, formula)
if tag != nil
tag = stripped_tag (tag)
end
_argv = []
-
+
argv.each {|arg|
if tag != nil and tag != 'all'
arg = tag + '_' + arg
end
_argv.push obj[arg]
}
formula.call(*_argv)
end
- @_formulas = [[0, 'unmatched', nil, nil]]
+ @_formulas = []
if conf.has_key?('formulas')
fs = conf['formulas'].split(/\s*,\s*/)
fs.each_with_index { |str,i |
- @_formulas.push( createFunc(i + 1, str) )
+ @_formulas.push( createFunc(i, str) )
}
end
if conf.has_key?('finalizer')
@_finalizer = createFunc(0, conf['finalizer'])
@@ -127,11 +156,11 @@
def countups(tag, counts)
if @aggregate == :all
tag = 'all'
end
-
+
@mutex.synchronize {
@counts[tag] ||= [0] * @_formulas.length
counts.each_with_index do |count, i|
@counts[tag][i] += count
end
@@ -144,55 +173,73 @@
return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
tag
end
def generate_output(counts, step)
- output = {}
if @aggregate == :all
- # index 0 is unmatched
- sum = if @outcast_unmatched
- counts['all'][1..-1].inject(:+)
- else
- counts['all'].inject(:+)
- end
+ output = {}
counts['all'].each_with_index do |count,i|
name = @_formulas[i][1]
output[name] = count
end
if @_finalizer
output[@_finalizer[1]] = execFunc('all', output, @_finalizer[2], @_finalizer[3])
end
- return output
+ return [output]
end
+ if @aggregate == 'keys'
+ outputs = []
+
+ counts.keys.each do |pat|
+ output = {}
+ pat_val = pat.split('_').map{|x| x.to_i }
+ counts[pat].each_with_index do |count, i|
+ name = @_formulas[i][1]
+ output[name] = count
+ end
+
+ @aggregate_keys.each_with_index do |key, i|
+ output[@aggregate_keys[i]] = pat_val[i]
+ end
+
+ if @_finalizer
+ output[@_finalizer[1]] = execFunc('all', output, @_finalizer[2], @_finalizer[3])
+ end
+
+ outputs.push(output)
+ end
+
+ return outputs
+ end
+
+ output = {}
counts.keys.each do |tag|
t = stripped_tag(tag)
- sum = if @outcast_unmatched
- counts[tag][1..-1].inject(:+)
- else
- counts[tag].inject(:+)
- end
counts[tag].each_with_index do |count,i|
name = @_formulas[i][1]
output[t + '_' + name] = count
end
if @_finalizer
output[t + '_' + @_finalizer[1]] = execFunc(tag, output, @_finalizer[2], @_finalizer[3])
end
end
- output
+ [output]
end
def flush(step)
- flushed,@counts = @counts,count_initialized(@counts.keys.dup)
+ flushed, @counts = @counts,count_initialized(@counts.keys.dup)
generate_output(flushed, step)
end
def flush_emit(step)
- Fluent::Engine.emit(@tag, Fluent::Engine.now, flush(step))
+ data = flush(step)
+ data.each do |dat|
+ Fluent::Engine.emit(@tag, Fluent::Engine.now, dat)
+ end
end
def start_watch
# for internal, or tests only
@watcher = Thread.new(&method(:watch))
@@ -218,26 +265,63 @@
end
}
return true
end
- def emit(tag, es, chain)
- c = [0] * @_formulas.length
+ def emit (tag, es, chain)
+
+ if @aggregate == 'keys'
+ emit_aggregate_keys(tag, es, chain)
+ else
+ emit_single_tag(tag, es, chain)
+ end
+ end
+ def emit_aggregate_keys (tag, es, chain)
+ cs = {}
+ es.each do |time, record|
+ matched = false
+ pat = @aggregate_keys.map{ |key| record[key] }.join('_')
+ cs[pat] = [0] * @_formulas.length unless cs.has_key?(pat)
+
+ if @_formulas.length > 0
+ @_formulas.each do | index, outkey, inkeys, formula|
+ next unless formula and checkArgs(record, inkeys)
+
+ cs[pat][index] += execFunc('all', record, inkeys, formula)
+ matched = true
+ end
+ else
+ $log.warn index
+ end
+ cs[pat][0] += 1 unless matched
+ end
+
+ cs.keys.each do |pat|
+ countups(pat, cs[pat])
+ end
+
+ chain.next
+ end
+
+ def emit_single_tag (tag, es, chain)
+ c = [0] * @_formulas.length
+
es.each do |time,record|
matched = false
if @_formulas.length > 0
@_formulas.each do |index, outkey, inkeys, formula|
next unless formula and checkArgs(record, inkeys)
c[index] += execFunc(nil, record, inkeys, formula)
- matched = true
+ matched = true
end
else
$log.warn index
end
c[0] += 1 unless matched
end
+
countups(tag, c)
chain.next
end
end