lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.1.1 vs lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.1.2
- old
+ new
@@ -9,10 +9,11 @@
config_param :threshold, :integer, :default => 1
config_param :output_tag, :string, :default => nil
config_param :add_tag_prefix, :string, :default => 'count'
config_param :output_with_joined_delimiter, :string, :default => nil
config_param :aggregate, :string, :default => 'tag'
+ config_param :replace_invalid_sequence, :bool, :default => false
attr_accessor :matches
attr_accessor :last_checked
def configure(conf)
@@ -55,12 +56,11 @@
def emit(tag, es, chain)
count = 0; matches = []
# filter out and insert
es.each do |time,record|
value = record[@input_key]
- next if @regexp and !@regexp.match(value)
- next if @exclude and @exclude.match(value)
+ next unless match(value.to_s)
matches << value
count += 1
end
# thread safe merge
@counts[tag] ||= 0
@@ -69,22 +69,30 @@
@counts[tag] += count
@matches[tag] += matches
end
chain.next
+ rescue => e
+ $log.warn e.message
+ $log.warn e.backtrace.join(', ')
end
# thread callback
def watcher
# instance variable, and public accessable, for test
@last_checked = Fluent::Engine.now
while true
sleep 0.5
- if Fluent::Engine.now - @last_checked >= @count_interval
- now = Fluent::Engine.now
- flush_emit(now - @last_checked)
- @last_checked = now
+ begin
+ if Fluent::Engine.now - @last_checked >= @count_interval
+ now = Fluent::Engine.now
+ flush_emit(now - @last_checked)
+ @last_checked = now
+ end
+ rescue => e
+ $log.warn e.message
+ $log.warn e.backtrace.join(", ")
end
end
end
# This method is the real one to emit
@@ -110,10 +118,11 @@
end
end
end
def generate_output(count, matches, tag = nil)
+ return nil if count.nil?
return nil if count < @threshold
output = {}
output['count'] = count
output['message'] = @output_with_joined_delimiter.nil? ? matches : matches.join(@output_with_joined_delimiter)
if tag
@@ -121,6 +130,27 @@
output['input_tag_last'] = tag.split('.').last
end
output
end
+ def match(string)
+ begin
+ return false if @regexp and !@regexp.match(string)
+ return false if @exclude and @exclude.match(string)
+ rescue ArgumentError => e
+ unless e.message.index("invalid byte sequence in") == 0
+ raise
+ end
+ string = replace_invalid_byte(string)
+ return false if @regexp and !@regexp.match(string)
+ return false if @exclude and @exclude.match(string)
+ end
+ return true
+ end
+
+ def replace_invalid_byte(string)
+ replace_options = { invalid: :replace, undef: :replace, replace: '?' }
+ original_encoding = string.encoding
+ temporal_encoding = (original_encoding == Encoding::UTF_8 ? Encoding::UTF_16BE : Encoding::UTF_8)
+ string.encode(temporal_encoding, original_encoding, replace_options).encode(original_encoding)
+ end
end