Sha256: 4e36675f6559cdb48e60e467cca7bb7c70edeb9ad6249524d9224d2c732fb5b0
Contents?: true
Size: 1.56 KB
Versions: 4
Compression:
Stored size: 1.56 KB
Contents
class FluQ::Handler::Log < FluQ::Handler::Base class FilePool < TimedLRU def open(path) path = path.to_s self[path.to_s] ||= begin FileUtils.mkdir_p File.dirname(path) file = File.open(path, "a+") file.autoclose = true file end end end # @attr_reader [FluQ::Handler::Log::FilePool] file pool attr_reader :pool # @see FluQ::Handler::Base#initialize def initialize(*) super @full_path = FluQ.root.join(config[:path]).to_s.freeze @rewrite = config[:rewrite] @convert = config[:convert] @pool = FilePool.new max_size: config[:cache_max], ttl: config[:cache_ttl] end # @see FluQ::Handler::Base#on_events def on_events(events) partition(events).each {|path, slice| write(path, slice) } end protected # Configuration defaults def defaults super.merge \ path: "log/raw/%t/%Y%m%d/%H.log", rewrite: lambda {|tag| tag.gsub(".", "/") }, convert: lambda {|event| event.to_tsv }, cache_max: 100, cache_ttl: 300 end def write(path, slice, attepts = 0) io = @pool.open(path) slice.each do |event| io.write "#{@convert.call(event)}\n" end rescue IOError @pool.delete path.to_s (attepts+=1) < 3 ? retry : raise end def partition(events) paths = {} events.each do |event| tag = @rewrite.call(event.tag) path = event.time.strftime(@full_path.gsub("%t", tag)) paths[path] ||= [] paths[path] << event end paths end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
fluq-0.7.5 | lib/fluq/handler/log.rb |
fluq-0.7.3 | lib/fluq/handler/log.rb |
fluq-0.7.1 | lib/fluq/handler/log.rb |
fluq-0.7.0 | lib/fluq/handler/log.rb |