lib/fluent/plugin/out_dedup.rb in fluent-plugin-dedup-0.1.0 vs lib/fluent/plugin/out_dedup.rb in fluent-plugin-dedup-0.2.0

- old
+ new

@@ -1,12 +1,14 @@ require 'json' +require 'lru_redux' class Fluent::DedupOutput < Fluent::Output Fluent::Plugin.register_output('dedup', self) config_param :key, :string, :default => nil config_param :file, :string, :default => nil + config_param :cache_per_tag, :size, :default => 1 # Define `log` method for v0.10.42 or earlier unless method_defined?(:log) define_method("log") { $log } end @@ -22,13 +24,11 @@ end def start super - if not @file.nil? and File.file?(@file) - @states = JSON.parse(File.open(@file).read) rescue {} - end + restore_states end def shutdown super @@ -36,32 +36,54 @@ end def emit(tag, es, chain) es.each do |time, record| next if dup?(tag, record) - update_states(tag, record) Fluent::Engine.emit("dedup.#{tag}", time, record) end chain.next end private + def restore_states + if not @file.nil? and File.file?(@file) + dump = JSON.parse(File.open(@file).read) rescue {} + dump.each do |tag, ids| + lru = new_lru + ids.each {|id| lru[id] = true} + @states[tag] = lru + end + end + end + def save_states unless @file.nil? File.open(@file, 'wb') do |f| - f.print(@states.to_json) + dump = {} + @states.each do |tag, lru| + dump[tag] = lru.to_a.map(&:first) + end + f.print(dump.to_json) end end end def dup?(tag, record) - unless record.include?(@key) + is_dup = false + if record.include?(@key) + @states[tag] = new_lru unless @states.include?(tag) + if @states[tag].fetch(record[@key]) + is_dup = true + else + @states[tag][record[@key]] = true + end + else log.warn "record does not have key `#{@key}`, record: #{record.to_json}" end - @states[tag] == record[@key] + is_dup end - def update_states(tag, record) - @states[tag] = record[@key] + def new_lru + LruRedux::ThreadSafeCache.new(@cache_per_tag) end end