Sha256: 0b69420070fbb6243c44d20bc7dd107106e6a70950c5d4396082d5de24fdecc3
Contents?: true
Size: 1.95 KB
Versions: 1
Compression:
Stored size: 1.95 KB
Contents
class Fwd::Output extend Forwardable def_delegators :core, :logger, :root, :prefix CHUNK_SIZE = 16 * 1024 RESCUABLE = [ Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EPIPE, Errno::ENETUNREACH, Errno::ENETDOWN, Errno::EINVAL, Errno::ETIMEDOUT, IOError, EOFError ].freeze attr_reader :pool, :core # Constructor # @param [Fwd] core def initialize(core) backends = Array(core.opts[:forward]).compact.map do |s| Fwd::Backend.new(s) end @core = core @pool = Fwd::Pool.new(backends) end # Callback def forward! return if @forwarding @forwarding = true begin Dir[root.join("#{prefix}.*.closed")].sort.each do |file| ok = reserve(file) do |io| logger.debug { "Flushing #{File.basename(io.path)}, #{io.size.fdiv(1024).round} kB" } write(io) end ok or break end ensure @forwarding = false end end # @param [IO] io source stream def write(io) pool.any? do |backend| forward(backend, io) end end private def reserve(file) return if File.size(file) < 1 target = Pathname.new(file.sub(/\.closed$/, ".reserved")) FileUtils.mv file, target.to_s result = false target.open("r") do |io| result = yield(io) end if result target.unlink else logger.error "Flushing of #{target} failed." FileUtils.mv target.to_s, target.to_s.sub(/\.reserved$/, ".closed") end result rescue Errno::ENOENT => e # Ignore if file was alread flushed by another process logger.warn "Flushing of #{File.basename(file)} postponed: #{e.message}" end def forward(backend, io) io.rewind until io.eof? backend.write(io.read(CHUNK_SIZE)) end true rescue *RESCUABLE => e logger.error "Backend #{backend} failed: #{e.class.name} #{e.message}" backend.close false end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fwd-0.3.3 | lib/fwd/output.rb |