lib/fluq/handler/log.rb in fluq-0.7.5 vs lib/fluq/handler/log.rb in fluq-0.8.0

- old
+ new

@@ -18,14 +18,14 @@ attr_reader :pool # @see FluQ::Handler::Base#initialize def initialize(*) super - @full_path = FluQ.root.join(config[:path]).to_s.freeze - @rewrite = config[:rewrite] - @convert = config[:convert] - @pool = FilePool.new max_size: config[:cache_max], ttl: config[:cache_ttl] + @path = config[:path] + @rewrite = config[:rewrite] + @convert = config[:convert] + @pool = FilePool.new max_size: config[:cache_max], ttl: config[:cache_ttl] end # @see FluQ::Handler::Base#on_events def on_events(events) partition(events).each {|path, slice| write(path, slice) } @@ -34,34 +34,32 @@ protected # Configuration defaults def defaults super.merge \ - path: "log/raw/%t/%Y%m%d/%H.log", - rewrite: lambda {|tag| tag.gsub(".", "/") }, - convert: lambda {|event| event.to_tsv }, + path: "log/raw/%Y%m%d.log", + convert: ->evt { [evt.timestamp, MultiJson.dump(evt)].join("\t") }, cache_max: 100, cache_ttl: 300 end def write(path, slice, attepts = 0) io = @pool.open(path) slice.each do |event| - io.write "#{@convert.call(event)}\n" + io.write @convert.call(event) << "\n" end rescue IOError @pool.delete path.to_s (attepts+=1) < 3 ? retry : raise end def partition(events) - paths = {} + paths = Hash.new {|h,k| h[k] = [] } events.each do |event| - tag = @rewrite.call(event.tag) - path = event.time.strftime(@full_path.gsub("%t", tag)) - paths[path] ||= [] - paths[path] << event + tag = @rewrite ? @rewrite.call(event).to_s : "" + path = event.time.strftime(FluQ.root.join(@path).to_s.gsub("%t", tag)) + paths[path] << event end paths end -end \ No newline at end of file +end