lib/fluq/handler/log.rb in fluq-0.7.5 vs lib/fluq/handler/log.rb in fluq-0.8.0
- old
+ new
@@ -18,14 +18,14 @@
attr_reader :pool
# @see FluQ::Handler::Base#initialize
def initialize(*)
super
- @full_path = FluQ.root.join(config[:path]).to_s.freeze
- @rewrite = config[:rewrite]
- @convert = config[:convert]
- @pool = FilePool.new max_size: config[:cache_max], ttl: config[:cache_ttl]
+ @path = config[:path]
+ @rewrite = config[:rewrite]
+ @convert = config[:convert]
+ @pool = FilePool.new max_size: config[:cache_max], ttl: config[:cache_ttl]
end
# @see FluQ::Handler::Base#on_events
def on_events(events)
partition(events).each {|path, slice| write(path, slice) }
@@ -34,34 +34,32 @@
protected
# Configuration defaults
def defaults
super.merge \
- path: "log/raw/%t/%Y%m%d/%H.log",
- rewrite: lambda {|tag| tag.gsub(".", "/") },
- convert: lambda {|event| event.to_tsv },
+ path: "log/raw/%Y%m%d.log",
+ convert: ->evt { [evt.timestamp, MultiJson.dump(evt)].join("\t") },
cache_max: 100,
cache_ttl: 300
end
def write(path, slice, attepts = 0)
io = @pool.open(path)
slice.each do |event|
- io.write "#{@convert.call(event)}\n"
+ io.write @convert.call(event) << "\n"
end
rescue IOError
@pool.delete path.to_s
(attepts+=1) < 3 ? retry : raise
end
def partition(events)
- paths = {}
+ paths = Hash.new {|h,k| h[k] = [] }
events.each do |event|
- tag = @rewrite.call(event.tag)
- path = event.time.strftime(@full_path.gsub("%t", tag))
- paths[path] ||= []
- paths[path] << event
+ tag = @rewrite ? @rewrite.call(event).to_s : ""
+ path = event.time.strftime(FluQ.root.join(@path).to_s.gsub("%t", tag))
+ paths[path] << event
end
paths
end
-end
\ No newline at end of file
+end