Sha256: 837a6278d0dac64047372d65c20f5213ab8224b4a6eb0365fd8040a69a4bfe81

Contents?: true

Size: 1.91 KB

Versions: 1

Compression:

Stored size: 1.91 KB

Contents

require 'json'
require 'lru_redux'

class Fluent::DedupOutput < Fluent::Output
  Fluent::Plugin.register_output('dedup', self)

  config_param :key, :string, :default => nil
  config_param :file, :string, :default => nil
  config_param :cache_per_tag, :size, :default => 1
  config_param :cache_ttl, :integer, :default => 0

  # Define `log` method for v0.10.42 or earlier
  unless method_defined?(:log)
    define_method("log") { $log }
  end

  def configure(conf)
    super
    unless conf.include?('key')
      raise Fluent::ConfigError, "config parameter `key` is required"
    end
    @states = {}
  end

  def start
    super

    restore_states
  end

  def shutdown
    super

    save_states
  end

  def emit(tag, es, chain)
    es.each do |time, record|
      next if dup?(tag, record)
      Fluent::Engine.emit("dedup.#{tag}", time, record)
    end

    chain.next
  end

  private
  def restore_states
    if not @file.nil? and File.file?(@file)
      dump = JSON.parse(File.open(@file).read) rescue {}
      dump.each do |tag, ids|
        lru = new_lru
        ids.each {|id| lru[id] = true}
        @states[tag] = lru
      end
    end
  end

  def save_states
    unless @file.nil?
      File.open(@file, 'wb') do |f|
        dump = {}
        @states.each do |tag, lru|
          dump[tag] = lru.to_a.map(&:first)
        end
        f.print(dump.to_json)
      end
    end
  end

  def dup?(tag, record)
    is_dup = false
    if record.include?(@key)
      @states[tag] = new_lru unless @states.include?(tag)
      if @states[tag].fetch(record[@key])
        is_dup = true
      else
        @states[tag][record[@key]] = true
      end
    else
      log.warn "record does not have key `#{@key}`, record: #{record.to_json}"
    end
    is_dup
  end

  def new_lru
    if 0 < @cache_ttl
      LruRedux::TTL::ThreadSafeCache.new(@cache_per_tag, @cache_ttl)
    else
      LruRedux::ThreadSafeCache.new(@cache_per_tag)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-dedup-0.3.0 lib/fluent/plugin/out_dedup.rb