lib/fluent/plugin/out_dedup.rb in fluent-plugin-dedup-0.1.0 vs lib/fluent/plugin/out_dedup.rb in fluent-plugin-dedup-0.2.0
- old
+ new
@@ -1,12 +1,14 @@
require 'json'
+require 'lru_redux'
class Fluent::DedupOutput < Fluent::Output
Fluent::Plugin.register_output('dedup', self)
config_param :key, :string, :default => nil
config_param :file, :string, :default => nil
+ config_param :cache_per_tag, :size, :default => 1
# Define `log` method for v0.10.42 or earlier
unless method_defined?(:log)
define_method("log") { $log }
end
@@ -22,13 +24,11 @@
end
def start
super
- if not @file.nil? and File.file?(@file)
- @states = JSON.parse(File.open(@file).read) rescue {}
- end
+ restore_states
end
def shutdown
super
@@ -36,32 +36,54 @@
end
def emit(tag, es, chain)
es.each do |time, record|
next if dup?(tag, record)
- update_states(tag, record)
Fluent::Engine.emit("dedup.#{tag}", time, record)
end
chain.next
end
private
+ def restore_states
+ if not @file.nil? and File.file?(@file)
+ dump = JSON.parse(File.open(@file).read) rescue {}
+ dump.each do |tag, ids|
+ lru = new_lru
+ ids.each {|id| lru[id] = true}
+ @states[tag] = lru
+ end
+ end
+ end
+
def save_states
unless @file.nil?
File.open(@file, 'wb') do |f|
- f.print(@states.to_json)
+ dump = {}
+ @states.each do |tag, lru|
+ dump[tag] = lru.to_a.map(&:first)
+ end
+ f.print(dump.to_json)
end
end
end
def dup?(tag, record)
- unless record.include?(@key)
+ is_dup = false
+ if record.include?(@key)
+ @states[tag] = new_lru unless @states.include?(tag)
+ if @states[tag].fetch(record[@key])
+ is_dup = true
+ else
+ @states[tag][record[@key]] = true
+ end
+ else
log.warn "record does not have key `#{@key}`, record: #{record.to_json}"
end
- @states[tag] == record[@key]
+ is_dup
end
- def update_states(tag, record)
- @states[tag] = record[@key]
+ def new_lru
+ LruRedux::ThreadSafeCache.new(@cache_per_tag)
end
end