Sha256: 4e36675f6559cdb48e60e467cca7bb7c70edeb9ad6249524d9224d2c732fb5b0

Contents?: true

Size: 1.56 KB

Versions: 4

Compression:

Stored size: 1.56 KB

Contents

class FluQ::Handler::Log < FluQ::Handler::Base

  class FilePool < TimedLRU

    def open(path)
      path = path.to_s
      self[path.to_s] ||= begin
        FileUtils.mkdir_p File.dirname(path)
        file = File.open(path, "a+")
        file.autoclose = true
        file
      end
    end

  end

  # @attr_reader [FluQ::Handler::Log::FilePool] file pool
  attr_reader :pool

  # @see FluQ::Handler::Base#initialize
  def initialize(*)
    super
    @full_path = FluQ.root.join(config[:path]).to_s.freeze
    @rewrite   = config[:rewrite]
    @convert   = config[:convert]
    @pool      = FilePool.new max_size: config[:cache_max], ttl: config[:cache_ttl]
  end

  # @see FluQ::Handler::Base#on_events
  def on_events(events)
    partition(events).each {|path, slice| write(path, slice) }
  end

  protected

    # Configuration defaults
    def defaults
      super.merge \
        path: "log/raw/%t/%Y%m%d/%H.log",
        rewrite:  lambda {|tag| tag.gsub(".", "/") },
        convert:  lambda {|event| event.to_tsv },
        cache_max: 100,
        cache_ttl: 300
    end

    def write(path, slice, attepts = 0)
      io = @pool.open(path)
      slice.each do |event|
        io.write "#{@convert.call(event)}\n"
      end
    rescue IOError
      @pool.delete path.to_s
      (attepts+=1) < 3 ? retry : raise
    end

    def partition(events)
      paths = {}
      events.each do |event|
        tag  = @rewrite.call(event.tag)
        path = event.time.strftime(@full_path.gsub("%t", tag))
        paths[path] ||= []
        paths[path]  << event
      end
      paths
    end

end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
fluq-0.7.5 lib/fluq/handler/log.rb
fluq-0.7.3 lib/fluq/handler/log.rb
fluq-0.7.1 lib/fluq/handler/log.rb
fluq-0.7.0 lib/fluq/handler/log.rb