Sha256: 69cbaa5c5fac32cf2ad92d1f9bd85829333a734a677f40a3b801e92cdd3d8b6e
Contents?: true
Size: 1.31 KB
Versions: 1
Compression:
Stored size: 1.31 KB
Contents
require 'json' class Fluent::DedupOutput < Fluent::Output Fluent::Plugin.register_output('dedup', self) config_param :key, :string, :default => nil config_param :file, :string, :default => nil # Define `log` method for v0.10.42 or earlier unless method_defined?(:log) define_method("log") { $log } end def configure(conf) super unless conf.include?('key') raise Fluent::ConfigError, "config parameter `key` is required" end @key = conf['key'] @file = conf['file'] @states = {} end def start super if not @file.nil? and File.file?(@file) @states = JSON.parse(File.open(@file).read) rescue {} end end def shutdown super save_states end def emit(tag, es, chain) es.each do |time, record| next if dup?(tag, record) update_states(tag, record) Fluent::Engine.emit("dedup.#{tag}", time, record) end chain.next end private def save_states unless @file.nil? File.open(@file, 'wb') do |f| f.print(@states.to_json) end end end def dup?(tag, record) unless record.include?(@key) log.warn "record does not have key `#{@key}`, record: #{record.to_json}" end @states[tag] == record[@key] end def update_states(tag, record) @states[tag] = record[@key] end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-dedup-0.1.0 | lib/fluent/plugin/out_dedup.rb |