Sha256: fdfe6a7496ee890bf60fe72cc08edb94a4ba623684327d9c25f9d66401dacb8d
Contents?: true
Size: 1.97 KB
Versions: 1
Compression:
Stored size: 1.97 KB
Contents
class Fwd::Output extend Forwardable def_delegators :core, :logger, :root, :prefix RESCUABLE = [ Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EPIPE, Errno::ENETUNREACH, Errno::ENETDOWN, Errno::EINVAL, Errno::ETIMEDOUT, IOError, EOFError ].freeze attr_reader :pool, :core # Constructor # @param [Fwd] core def initialize(core) backends = Array(core.opts[:forward]).compact.map do |s| Fwd::Backend.new(s) end @core = core @pool = Fwd::Pool.new(backends) end # Callback def forward! return if @forwarding @forwarding = true while (q = closed_files) && (file = q.shift) ok = reserve(file) do |reserved| start = Time.now success = stream_file(reserved) real = Time.now - start logger.info { "Flushed #{reserved.basename}, #{reserved.size.fdiv(1024).round}k in #{real.round(1)}s (Q: #{q.size})" } success end ok || break end ensure @forwarding = false end # @param [Pathname] file file to stream def stream_file(file) pool.any? do |backend| stream_to(backend, file) end end private def reserve(file) return if File.size(file) < 1 target = Pathname.new(file.sub(/\.closed$/, ".reserved")) FileUtils.mv file, target.to_s success = yield(target) if success target.unlink else logger.error "Flushing #{File.basename(file)} failed" FileUtils.mv target.to_s, file end success rescue Errno::ENOENT => e # Ignore if file was alread flushed by another process logger.warn "Flushing #{File.basename(file)} postponed: #{e.message}" end def stream_to(backend, file) backend.stream(file) true rescue *RESCUABLE => e logger.error "Backend #{backend} failed: #{e.class.name} #{e.message}" backend.close false end def closed_files Dir[root.join("#{prefix}.*.closed")].sort end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fwd-0.4.0 | lib/fwd/output.rb |