lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.1.1 vs lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.1.2

- old
+ new

@@ -9,10 +9,11 @@ config_param :threshold, :integer, :default => 1 config_param :output_tag, :string, :default => nil config_param :add_tag_prefix, :string, :default => 'count' config_param :output_with_joined_delimiter, :string, :default => nil config_param :aggregate, :string, :default => 'tag' + config_param :replace_invalid_sequence, :bool, :default => false attr_accessor :matches attr_accessor :last_checked def configure(conf) @@ -55,12 +56,11 @@ def emit(tag, es, chain) count = 0; matches = [] # filter out and insert es.each do |time,record| value = record[@input_key] - next if @regexp and !@regexp.match(value) - next if @exclude and @exclude.match(value) + next unless match(value.to_s) matches << value count += 1 end # thread safe merge @counts[tag] ||= 0 @@ -69,22 +69,30 @@ @counts[tag] += count @matches[tag] += matches end chain.next + rescue => e + $log.warn e.message + $log.warn e.backtrace.join(', ') end # thread callback def watcher # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now while true sleep 0.5 - if Fluent::Engine.now - @last_checked >= @count_interval - now = Fluent::Engine.now - flush_emit(now - @last_checked) - @last_checked = now + begin + if Fluent::Engine.now - @last_checked >= @count_interval + now = Fluent::Engine.now + flush_emit(now - @last_checked) + @last_checked = now + end + rescue => e + $log.warn e.message + $log.warn e.backtrace.join(", ") end end end # This method is the real one to emit @@ -110,10 +118,11 @@ end end end def generate_output(count, matches, tag = nil) + return nil if count.nil? return nil if count < @threshold output = {} output['count'] = count output['message'] = @output_with_joined_delimiter.nil? ? matches : matches.join(@output_with_joined_delimiter) if tag @@ -121,6 +130,27 @@ output['input_tag_last'] = tag.split('.').last end output end + def match(string) + begin + return false if @regexp and !@regexp.match(string) + return false if @exclude and @exclude.match(string) + rescue ArgumentError => e + unless e.message.index("invalid byte sequence in") == 0 + raise + end + string = replace_invalid_byte(string) + return false if @regexp and !@regexp.match(string) + return false if @exclude and @exclude.match(string) + end + return true + end + + def replace_invalid_byte(string) + replace_options = { invalid: :replace, undef: :replace, replace: '?' } + original_encoding = string.encoding + temporal_encoding = (original_encoding == Encoding::UTF_8 ? Encoding::UTF_16BE : Encoding::UTF_8) + string.encode(temporal_encoding, original_encoding, replace_options).encode(original_encoding) + end end