Sha256: 613e20f3d02df79774ba64b0d23b597bf2df0a7695861e33777f51a4ea76e519
Contents?: true
Size: 1.52 KB
Versions: 1
Compression:
Stored size: 1.52 KB
Contents
module Flume class LogDevice lazy_accessor :redis lazy_accessor :cap lazy_accessor :step lazy_accessor :cycle lazy_accessor :list def initialize(*args, &block) options = args.last.is_a?(Hash) ? args.pop : {} @config = OpenStruct.new(options) block.call(@config) if block @redis = @config.redis || proc { Redis.new } @cap = @config.cap || (2 ** 16) @step = @config.step || 0 @cycle = @config.cycle || (2 ** 8) @list = @config.list || 'flume:log' end def channel "flume:#{list}" end def write(message) begin redis.lpush(list, message) rescue Object => e error = "#{ e.message } (#{ e.class })\n#{ Array(e.backtrace).join(10.chr) }" STDERR.puts(error) STDERR.puts(message) end ensure redis.publish(channel, message) if (step % cycle).zero? truncate(cap) rescue nil end self.step = (step + 1) % cycle end def close redis.quit rescue nil end def tail(n = 80) redis.lrange(list, 0, n - 1).reverse end def tailf(&block) begin redis.subscribe(channel) do |on| on.message do |channel, message| block.call(message) end end rescue Redis::BaseConnectionError => error puts "#{error}, retrying in 1s" sleep 1 retry end end def truncate(n) redis.ltrim(list, 0, n - 1) end def size redis.llen(list) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
flume-0.0.3 | lib/flume/log_device.rb |