lib/fluent/plugin/out_groupcounter.rb in fluent-plugin-groupcounter-0.1.0 vs lib/fluent/plugin/out_groupcounter.rb in fluent-plugin-groupcounter-0.2.0

- old
+ new

@@ -1,29 +1,49 @@ class Fluent::GroupCounterOutput < Fluent::Output Fluent::Plugin.register_output('groupcounter', self) + def initialize + super + require 'pathname' + end + config_param :count_interval, :time, :default => nil config_param :unit, :string, :default => 'minute' config_param :output_per_tag, :bool, :default => false config_param :aggregate, :string, :default => 'tag' config_param :tag, :string, :default => 'groupcount' config_param :tag_prefix, :string, :default => nil config_param :input_tag_remove_prefix, :string, :default => nil - config_param :group_by_keys, :string - config_param :output_messages, :bool, :default => false + config_param :group_by_keys, :string, :default => nil + config_param :group_by_expression, :string, :default => nil + config_param :max_key, :string, :default => nil + config_param :min_key, :string, :default => nil + config_param :avg_key, :string, :default => nil + config_param :delimiter, :string, :default => '_' + config_param :count_suffix, :string, :default => '_count' + config_param :max_suffix, :string, :default => '_max' + config_param :min_suffix, :string, :default => '_min' + config_param :avg_suffix, :string, :default => '_avg' + config_param :store_file, :string, :default => nil - attr_accessor :tick + attr_accessor :count_interval attr_accessor :counts + attr_accessor :saved_duration + attr_accessor :saved_at attr_accessor :last_checked def configure(conf) super + if @group_by_keys.nil? and @group_by_expression.nil? + raise Fluent::ConfigError, "Either of group_by_keys or group_by_expression must be specified" + end + if @count_interval - @tick = @count_interval.to_i + @count_interval = @count_interval.to_i else - @tick = case @unit + @count_interval = case @unit when 'minute' then 60 when 'hour' then 3600 when 'day' then 86400 else raise RuntimeError, "@unit must be one of minute/hour/day" @@ -45,157 +65,268 @@ if @input_tag_remove_prefix @removed_prefix_string = @input_tag_remove_prefix + '.' @removed_length = @removed_prefix_string.length end - @group_by_keys = @group_by_keys.split(',') + @group_by_keys = @group_by_keys.split(',') if @group_by_keys + if @store_file + f = Pathname.new(@store_file) + if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?) + raise Fluent::ConfigError, "#{@store_file} is not writable" + end + end + @counts = count_initialized + @hostname = Socket.gethostname @mutex = Mutex.new end def start super + load_status(@store_file, @count_interval) if @store_file start_watch end def shutdown super @watcher.terminate @watcher.join + save_status(@store_file) if @store_file end def count_initialized - # counts['tag'][group_by_keys] = count - # counts['tag'][__sum] = sum {} end - def countups(tag, counts) - if @aggregate == :all - tag = 'all' - end - @counts[tag] ||= {} - - @mutex.synchronize { - sum = 0 - counts.each do |key, count| - sum += count - @counts[tag][key] ||= 0 - @counts[tag][key] += count - end - @counts[tag]['__sum'] ||= 0 - @counts[tag]['__sum'] += sum - } - end + def generate_fields(counts_per_tag, output = {}, key_prefix = '') + return {} unless counts_per_tag + # total_count = counts_per_tag.delete('__total_count') - def stripped_tag(tag) - return tag unless @input_tag_remove_prefix - return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length - return tag[@removed_length..-1] if tag == @input_tag_remove_prefix - tag - end - - def generate_fields(step, target_counts, attr_prefix, output) - return {} unless target_counts - sum = target_counts['__sum'] - messages = target_counts.delete('__sum') - - target_counts.each do |key, count| - output[attr_prefix + key + '_count'] = count - output[attr_prefix + key + '_rate'] = ((count * 100.0) / (1.00 * step)).floor / 100.0 - output[attr_prefix + key + '_percentage'] = count * 100.0 / (1.00 * sum) if sum > 0 - if @output_messages - output[attr_prefix + 'messages'] = messages - end + counts_per_tag.each do |group_key, count| + output[key_prefix + group_key + @count_suffix] = count[:count] if count[:count] + output[key_prefix + group_key + "#{@delimiter}#{@min_key}#{@min_suffix}"] = count[:min] if count[:min] + output[key_prefix + group_key + "#{@delimiter}#{@max_key}#{@max_suffix}"] = count[:max] if count[:max] + output[key_prefix + group_key + "#{@delimiter}#{@avg_key}#{@avg_suffix}"] = count[:sum] / (count[:count] * 1.0) if count[:sum] and count[:count] > 0 + # output[key_prefix + group_key + "#{@delimiter}rate"] = ((count[:count] * 100.0) / (1.00 * step)).floor / 100.0 + # output[key_prefix + group_key + "#{@delimiter}percentage"] = count[:count] * 100.0 / (1.00 * total_count) if total_count > 0 end output end - def generate_output(counts, step) - if @aggregate == :all - return generate_fields(step, counts['all'], '', {}) - end + def generate_output(counts) + if @output_per_tag # tag => output + return {'all' => generate_fields(counts['all'])} if @aggregate == :all - output = {} - counts.keys.each do |tag| - generate_fields(step, counts[tag], stripped_tag(tag) + '_', output) - end - output - end + output_pairs = {} + counts.keys.each do |tag| + output_pairs[stripped_tag(tag)] = generate_fields(counts[tag]) + end + output_pairs + else + return generate_fields(counts['all']) if @aggregate == :all - def generate_output_per_tags(counts, step) - if @aggregate == :all - return {'all' => generate_fields(step, counts['all'], '', {})} + output = {} + counts.keys.each do |tag| + generate_fields(counts[tag], output, stripped_tag(tag) + '_') + end + output end - - output_pairs = {} - counts.keys.each do |tag| - output_pairs[stripped_tag(tag)] = generate_fields(step, counts[tag], '', {}) - end - output_pairs end - def flush(step) # returns one message - flushed,@counts = @counts,count_initialized() - generate_output(flushed, step) + def flush + flushed, @counts = @counts, count_initialized() + generate_output(flushed) end - def flush_per_tags(step) # returns map of tag - message - flushed,@counts = @counts,count_initialized() - generate_output_per_tags(flushed, step) - end - - def flush_emit(step) + # this method emits messages (periodically called) + def flush_emit + time = Fluent::Engine.now if @output_per_tag - # tag - message maps - time = Fluent::Engine.now - flush_per_tags(step).each do |tag,message| + flush.each do |tag, message| Fluent::Engine.emit(@tag_prefix_string + tag, time, message) end else - message = flush(step) - if message.keys.size > 0 - Fluent::Engine.emit(@tag, Fluent::Engine.now, message) - end + message = flush + Fluent::Engine.emit(@tag, time, message) unless message.empty? end end def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end def watch # instance variable, and public accessable, for test - @last_checked = Fluent::Engine.now + @last_checked ||= Fluent::Engine.now while true sleep 0.5 - if Fluent::Engine.now - @last_checked >= @tick - now = Fluent::Engine.now - flush_emit(now - @last_checked) - @last_checked = now + begin + if Fluent::Engine.now - @last_checked >= @count_interval + now = Fluent::Engine.now + flush_emit + @last_checked = now + end + rescue => e + $log.warn "#{e.class} #{e.message} #{e.backtrace.first}" end end end + # recieve messages at here def emit(tag, es, chain) - c = {} + group_counts = {} - es.each do |time,record| - values = [] - @group_by_keys.each { |key| - v = record[key] || 'undef' - values.push(v) - } - value = values.join('_') + tags = tag.split('.') + es.each do |time, record| + count = {} + count[:count] = 1 + count[:sum] = record[@avg_key].to_f if @avg_key and record[@avg_key] + count[:max] = record[@max_key].to_f if @max_key and record[@max_key] + count[:min] = record[@min_key].to_f if @min_key and record[@min_key] - value = value.to_s.force_encoding('ASCII-8BIT') - c[value] ||= 0 - c[value] += 1 + group_key = group_key(tag, time, record) + + group_counts[group_key] ||= {} + countup(group_counts[group_key], count) end - countups(tag, c) + summarize_counts(tag, group_counts) chain.next + rescue => e + $log.warn "#{e.class} #{e.message} #{e.backtrace.first}" + end + + # Summarize counts for each tag + def summarize_counts(tag, group_counts) + tag = 'all' if @aggregate == :all + @counts[tag] ||= {} + + @mutex.synchronize { + group_counts.each do |group_key, count| + @counts[tag][group_key] ||= {} + countup(@counts[tag][group_key], count) + end + + # total_count = group_counts.map {|group_key, count| count[:count] }.inject(:+) + # @counts[tag]['__total_count'] = sum(@counts[tag]['__total_count'], total_count) + } + end + + def countup(counts, count) + counts[:count] = sum(counts[:count], count[:count]) + counts[:sum] = sum(counts[:sum], count[:sum]) if @avg_key and count[:sum] + counts[:max] = max(counts[:max], count[:max]) if @max_key and count[:max] + counts[:min] = min(counts[:min], count[:min]) if @min_key and count[:min] + end + + # Expand record with @group_by_keys, and get a value to be a group_key + def group_key(tag, time, record) + if @group_by_expression + tags = tag.split('.') + group_key = expand_placeholder(@group_by_expression, record, tag, tags, Time.at(time)) + else # @group_by_keys + values = @group_by_keys.map {|key| record[key] || 'undef'} + group_key = values.join(@delimiter) + end + group_key = group_key.to_s.force_encoding('ASCII-8BIT') + end + + def sum(a, b) + a ||= 0 + b ||= 0 + a + b + end + + def max(a, b) + return b if a.nil? + return a if b.nil? + a > b ? a : b + end + + def min(a, b) + return b if a.nil? + return a if b.nil? + a > b ? b : a + end + + def stripped_tag(tag) + return tag unless @input_tag_remove_prefix + return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length + return tag[@removed_length..-1] if tag == @input_tag_remove_prefix + tag + end + + # Store internal status into a file + # + # @param [String] file_path + def save_status(file_path) + + begin + Pathname.new(file_path).open('wb') do |f| + @saved_at = Fluent::Engine.now + @saved_duration = @saved_at - @last_checked + Marshal.dump({ + :counts => @counts, + :saved_at => @saved_at, + :saved_duration => @saved_duration, + :aggregate => @aggregate, + :group_by_keys => @group_by_keys, + }, f) + end + rescue => e + $log.warn "out_groupcounter: Can't write store_file #{e.class} #{e.message}" + end + end + + # Load internal status from a file + # + # @param [String] file_path + # @param [Interger] count_interval + def load_status(file_path, count_interval) + return unless (f = Pathname.new(file_path)).exist? + + begin + f.open('rb') do |f| + stored = Marshal.load(f) + if stored[:aggregate] == @aggregate and + stored[:group_by_keys] == @group_by_keys and + + if Fluent::Engine.now <= stored[:saved_at] + count_interval + @counts = stored[:counts] + @saved_at = stored[:saved_at] + @saved_duration = stored[:saved_duration] + + # skip the saved duration to continue counting + @last_checked = Fluent::Engine.now - @saved_duration + else + $log.warn "out_groupcounter: stored data is outdated. ignore stored data" + end + else + $log.warn "out_groupcounter: configuration param was changed. ignore stored data" + end + end + rescue => e + $log.warn "out_groupcounter: Can't load store_file #{e.class} #{e.message}" + end + end + + private + + def expand_placeholder(str, record, tag, tags, time) + struct = UndefOpenStruct.new(record) + struct.tag = tag + struct.tags = tags + struct.time = time + struct.hostname = @hostname + str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..} + eval "\"#{str}\"", struct.instance_eval { binding } + end + + class UndefOpenStruct < OpenStruct + (Object.instance_methods).each do |m| + undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/ + end end end