Sha256: 1e4cc58f6c120c51a17217becc410f39266c74b9ea9b6cc40a4456c62663be5d

Contents?: true

Size: 1.53 KB

Versions: 2

Compression:

Stored size: 1.53 KB

Contents

class FluQ::Handler::Log < FluQ::Handler::Base

  class FilePool < TimedLRU

    def open(path)
      path = path.to_s
      self[path.to_s] ||= begin
        FileUtils.mkdir_p File.dirname(path)
        file = File.open(path, "a+")
        file.autoclose = true
        file
      end
    end

  end

  # @attr_reader [FluQ::Handler::Log::FilePool] file pool
  attr_reader :pool

  # @see FluQ::Handler::Base#initialize
  def initialize(*)
    super
    @path    = config[:path]
    @rewrite = config[:rewrite]
    @convert = config[:convert]
    @pool    = FilePool.new max_size: config[:cache_max], ttl: config[:cache_ttl]
  end

  # @see FluQ::Handler::Base#on_events
  def on_events(events)
    partition(events).each {|path, slice| write(path, slice) }
  end

  protected

    # Configuration defaults
    def defaults
      super.merge \
        path:     "log/raw/%Y%m%d.log",
        convert:  ->evt { [evt.timestamp, MultiJson.dump(evt)].join("\t") },
        cache_max: 100,
        cache_ttl: 300
    end

    def write(path, slice, attepts = 0)
      io = @pool.open(path)
      slice.each do |event|
        io.write @convert.call(event) << "\n"
      end
    rescue IOError
      @pool.delete path.to_s
      (attepts+=1) < 3 ? retry : raise
    end

    def partition(events)
      paths = Hash.new {|h,k| h[k] = [] }
      events.each do |event|
        tag  = @rewrite ? @rewrite.call(event).to_s : ""
        path = event.time.strftime(FluQ.root.join(@path).to_s.gsub("%t", tag))
        paths[path] << event
      end
      paths
    end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluq-0.8.1 lib/fluq/handler/log.rb
fluq-0.8.0 lib/fluq/handler/log.rb