lib/fluent/plugin/out_groupcounter.rb in fluent-plugin-groupcounter-0.1.0 vs lib/fluent/plugin/out_groupcounter.rb in fluent-plugin-groupcounter-0.2.0
- old
+ new
@@ -1,29 +1,49 @@
class Fluent::GroupCounterOutput < Fluent::Output
Fluent::Plugin.register_output('groupcounter', self)
+ def initialize
+ super
+ require 'pathname'
+ end
+
config_param :count_interval, :time, :default => nil
config_param :unit, :string, :default => 'minute'
config_param :output_per_tag, :bool, :default => false
config_param :aggregate, :string, :default => 'tag'
config_param :tag, :string, :default => 'groupcount'
config_param :tag_prefix, :string, :default => nil
config_param :input_tag_remove_prefix, :string, :default => nil
- config_param :group_by_keys, :string
- config_param :output_messages, :bool, :default => false
+ config_param :group_by_keys, :string, :default => nil
+ config_param :group_by_expression, :string, :default => nil
+ config_param :max_key, :string, :default => nil
+ config_param :min_key, :string, :default => nil
+ config_param :avg_key, :string, :default => nil
+ config_param :delimiter, :string, :default => '_'
+ config_param :count_suffix, :string, :default => '_count'
+ config_param :max_suffix, :string, :default => '_max'
+ config_param :min_suffix, :string, :default => '_min'
+ config_param :avg_suffix, :string, :default => '_avg'
+ config_param :store_file, :string, :default => nil
- attr_accessor :tick
+ attr_accessor :count_interval
attr_accessor :counts
+ attr_accessor :saved_duration
+ attr_accessor :saved_at
attr_accessor :last_checked
def configure(conf)
super
+ if @group_by_keys.nil? and @group_by_expression.nil?
+ raise Fluent::ConfigError, "Either of group_by_keys or group_by_expression must be specified"
+ end
+
if @count_interval
- @tick = @count_interval.to_i
+ @count_interval = @count_interval.to_i
else
- @tick = case @unit
+ @count_interval = case @unit
when 'minute' then 60
when 'hour' then 3600
when 'day' then 86400
else
raise RuntimeError, "@unit must be one of minute/hour/day"
@@ -45,157 +65,268 @@
if @input_tag_remove_prefix
@removed_prefix_string = @input_tag_remove_prefix + '.'
@removed_length = @removed_prefix_string.length
end
- @group_by_keys = @group_by_keys.split(',')
+ @group_by_keys = @group_by_keys.split(',') if @group_by_keys
+ if @store_file
+ f = Pathname.new(@store_file)
+ if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?)
+ raise Fluent::ConfigError, "#{@store_file} is not writable"
+ end
+ end
+
@counts = count_initialized
+ @hostname = Socket.gethostname
@mutex = Mutex.new
end
def start
super
+ load_status(@store_file, @count_interval) if @store_file
start_watch
end
def shutdown
super
@watcher.terminate
@watcher.join
+ save_status(@store_file) if @store_file
end
def count_initialized
- # counts['tag'][group_by_keys] = count
- # counts['tag'][__sum] = sum
{}
end
- def countups(tag, counts)
- if @aggregate == :all
- tag = 'all'
- end
- @counts[tag] ||= {}
-
- @mutex.synchronize {
- sum = 0
- counts.each do |key, count|
- sum += count
- @counts[tag][key] ||= 0
- @counts[tag][key] += count
- end
- @counts[tag]['__sum'] ||= 0
- @counts[tag]['__sum'] += sum
- }
- end
+ def generate_fields(counts_per_tag, output = {}, key_prefix = '')
+ return {} unless counts_per_tag
+ # total_count = counts_per_tag.delete('__total_count')
- def stripped_tag(tag)
- return tag unless @input_tag_remove_prefix
- return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
- return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
- tag
- end
-
- def generate_fields(step, target_counts, attr_prefix, output)
- return {} unless target_counts
- sum = target_counts['__sum']
- messages = target_counts.delete('__sum')
-
- target_counts.each do |key, count|
- output[attr_prefix + key + '_count'] = count
- output[attr_prefix + key + '_rate'] = ((count * 100.0) / (1.00 * step)).floor / 100.0
- output[attr_prefix + key + '_percentage'] = count * 100.0 / (1.00 * sum) if sum > 0
- if @output_messages
- output[attr_prefix + 'messages'] = messages
- end
+ counts_per_tag.each do |group_key, count|
+ output[key_prefix + group_key + @count_suffix] = count[:count] if count[:count]
+ output[key_prefix + group_key + "#{@delimiter}#{@min_key}#{@min_suffix}"] = count[:min] if count[:min]
+ output[key_prefix + group_key + "#{@delimiter}#{@max_key}#{@max_suffix}"] = count[:max] if count[:max]
+ output[key_prefix + group_key + "#{@delimiter}#{@avg_key}#{@avg_suffix}"] = count[:sum] / (count[:count] * 1.0) if count[:sum] and count[:count] > 0
+ # output[key_prefix + group_key + "#{@delimiter}rate"] = ((count[:count] * 100.0) / (1.00 * step)).floor / 100.0
+ # output[key_prefix + group_key + "#{@delimiter}percentage"] = count[:count] * 100.0 / (1.00 * total_count) if total_count > 0
end
output
end
- def generate_output(counts, step)
- if @aggregate == :all
- return generate_fields(step, counts['all'], '', {})
- end
+ def generate_output(counts)
+ if @output_per_tag # tag => output
+ return {'all' => generate_fields(counts['all'])} if @aggregate == :all
- output = {}
- counts.keys.each do |tag|
- generate_fields(step, counts[tag], stripped_tag(tag) + '_', output)
- end
- output
- end
+ output_pairs = {}
+ counts.keys.each do |tag|
+ output_pairs[stripped_tag(tag)] = generate_fields(counts[tag])
+ end
+ output_pairs
+ else
+ return generate_fields(counts['all']) if @aggregate == :all
- def generate_output_per_tags(counts, step)
- if @aggregate == :all
- return {'all' => generate_fields(step, counts['all'], '', {})}
+ output = {}
+ counts.keys.each do |tag|
+ generate_fields(counts[tag], output, stripped_tag(tag) + '_')
+ end
+ output
end
-
- output_pairs = {}
- counts.keys.each do |tag|
- output_pairs[stripped_tag(tag)] = generate_fields(step, counts[tag], '', {})
- end
- output_pairs
end
- def flush(step) # returns one message
- flushed,@counts = @counts,count_initialized()
- generate_output(flushed, step)
+ def flush
+ flushed, @counts = @counts, count_initialized()
+ generate_output(flushed)
end
- def flush_per_tags(step) # returns map of tag - message
- flushed,@counts = @counts,count_initialized()
- generate_output_per_tags(flushed, step)
- end
-
- def flush_emit(step)
+ # this method emits messages (periodically called)
+ def flush_emit
+ time = Fluent::Engine.now
if @output_per_tag
- # tag - message maps
- time = Fluent::Engine.now
- flush_per_tags(step).each do |tag,message|
+ flush.each do |tag, message|
Fluent::Engine.emit(@tag_prefix_string + tag, time, message)
end
else
- message = flush(step)
- if message.keys.size > 0
- Fluent::Engine.emit(@tag, Fluent::Engine.now, message)
- end
+ message = flush
+ Fluent::Engine.emit(@tag, time, message) unless message.empty?
end
end
def start_watch
# for internal, or tests only
@watcher = Thread.new(&method(:watch))
end
def watch
# instance variable, and public accessable, for test
- @last_checked = Fluent::Engine.now
+ @last_checked ||= Fluent::Engine.now
while true
sleep 0.5
- if Fluent::Engine.now - @last_checked >= @tick
- now = Fluent::Engine.now
- flush_emit(now - @last_checked)
- @last_checked = now
+ begin
+ if Fluent::Engine.now - @last_checked >= @count_interval
+ now = Fluent::Engine.now
+ flush_emit
+ @last_checked = now
+ end
+ rescue => e
+ $log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
end
end
end
+ # recieve messages at here
def emit(tag, es, chain)
- c = {}
+ group_counts = {}
- es.each do |time,record|
- values = []
- @group_by_keys.each { |key|
- v = record[key] || 'undef'
- values.push(v)
- }
- value = values.join('_')
+ tags = tag.split('.')
+ es.each do |time, record|
+ count = {}
+ count[:count] = 1
+ count[:sum] = record[@avg_key].to_f if @avg_key and record[@avg_key]
+ count[:max] = record[@max_key].to_f if @max_key and record[@max_key]
+ count[:min] = record[@min_key].to_f if @min_key and record[@min_key]
- value = value.to_s.force_encoding('ASCII-8BIT')
- c[value] ||= 0
- c[value] += 1
+ group_key = group_key(tag, time, record)
+
+ group_counts[group_key] ||= {}
+ countup(group_counts[group_key], count)
end
- countups(tag, c)
+ summarize_counts(tag, group_counts)
chain.next
+ rescue => e
+ $log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
+ end
+
+ # Summarize counts for each tag
+ def summarize_counts(tag, group_counts)
+ tag = 'all' if @aggregate == :all
+ @counts[tag] ||= {}
+
+ @mutex.synchronize {
+ group_counts.each do |group_key, count|
+ @counts[tag][group_key] ||= {}
+ countup(@counts[tag][group_key], count)
+ end
+
+ # total_count = group_counts.map {|group_key, count| count[:count] }.inject(:+)
+ # @counts[tag]['__total_count'] = sum(@counts[tag]['__total_count'], total_count)
+ }
+ end
+
+ def countup(counts, count)
+ counts[:count] = sum(counts[:count], count[:count])
+ counts[:sum] = sum(counts[:sum], count[:sum]) if @avg_key and count[:sum]
+ counts[:max] = max(counts[:max], count[:max]) if @max_key and count[:max]
+ counts[:min] = min(counts[:min], count[:min]) if @min_key and count[:min]
+ end
+
+ # Expand record with @group_by_keys, and get a value to be a group_key
+ def group_key(tag, time, record)
+ if @group_by_expression
+ tags = tag.split('.')
+ group_key = expand_placeholder(@group_by_expression, record, tag, tags, Time.at(time))
+ else # @group_by_keys
+ values = @group_by_keys.map {|key| record[key] || 'undef'}
+ group_key = values.join(@delimiter)
+ end
+ group_key = group_key.to_s.force_encoding('ASCII-8BIT')
+ end
+
+ def sum(a, b)
+ a ||= 0
+ b ||= 0
+ a + b
+ end
+
+ def max(a, b)
+ return b if a.nil?
+ return a if b.nil?
+ a > b ? a : b
+ end
+
+ def min(a, b)
+ return b if a.nil?
+ return a if b.nil?
+ a > b ? b : a
+ end
+
+ def stripped_tag(tag)
+ return tag unless @input_tag_remove_prefix
+ return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
+ return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
+ tag
+ end
+
+ # Store internal status into a file
+ #
+ # @param [String] file_path
+ def save_status(file_path)
+
+ begin
+ Pathname.new(file_path).open('wb') do |f|
+ @saved_at = Fluent::Engine.now
+ @saved_duration = @saved_at - @last_checked
+ Marshal.dump({
+ :counts => @counts,
+ :saved_at => @saved_at,
+ :saved_duration => @saved_duration,
+ :aggregate => @aggregate,
+ :group_by_keys => @group_by_keys,
+ }, f)
+ end
+ rescue => e
+ $log.warn "out_groupcounter: Can't write store_file #{e.class} #{e.message}"
+ end
+ end
+
+ # Load internal status from a file
+ #
+ # @param [String] file_path
+ # @param [Interger] count_interval
+ def load_status(file_path, count_interval)
+ return unless (f = Pathname.new(file_path)).exist?
+
+ begin
+ f.open('rb') do |f|
+ stored = Marshal.load(f)
+ if stored[:aggregate] == @aggregate and
+ stored[:group_by_keys] == @group_by_keys and
+
+ if Fluent::Engine.now <= stored[:saved_at] + count_interval
+ @counts = stored[:counts]
+ @saved_at = stored[:saved_at]
+ @saved_duration = stored[:saved_duration]
+
+ # skip the saved duration to continue counting
+ @last_checked = Fluent::Engine.now - @saved_duration
+ else
+ $log.warn "out_groupcounter: stored data is outdated. ignore stored data"
+ end
+ else
+ $log.warn "out_groupcounter: configuration param was changed. ignore stored data"
+ end
+ end
+ rescue => e
+ $log.warn "out_groupcounter: Can't load store_file #{e.class} #{e.message}"
+ end
+ end
+
+ private
+
+ def expand_placeholder(str, record, tag, tags, time)
+ struct = UndefOpenStruct.new(record)
+ struct.tag = tag
+ struct.tags = tags
+ struct.time = time
+ struct.hostname = @hostname
+ str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..}
+ eval "\"#{str}\"", struct.instance_eval { binding }
+ end
+
+ class UndefOpenStruct < OpenStruct
+ (Object.instance_methods).each do |m|
+ undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/
+ end
end
end