lib/fwd/output.rb in fwd-0.3.3 vs lib/fwd/output.rb in fwd-0.4.0

- old
+ new

@@ -1,10 +1,9 @@ class Fwd::Output extend Forwardable def_delegators :core, :logger, :root, :prefix - CHUNK_SIZE = 16 * 1024 RESCUABLE = [ Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EPIPE, Errno::ENETUNREACH, Errno::ENETDOWN, Errno::EINVAL, Errno::ETIMEDOUT, IOError, EOFError ].freeze @@ -24,27 +23,28 @@ # Callback def forward! return if @forwarding @forwarding = true - begin - Dir[root.join("#{prefix}.*.closed")].sort.each do |file| - ok = reserve(file) do |io| - logger.debug { "Flushing #{File.basename(io.path)}, #{io.size.fdiv(1024).round} kB" } - write(io) - end - ok or break + while (q = closed_files) && (file = q.shift) + ok = reserve(file) do |reserved| + start = Time.now + success = stream_file(reserved) + real = Time.now - start + logger.info { "Flushed #{reserved.basename}, #{reserved.size.fdiv(1024).round}k in #{real.round(1)}s (Q: #{q.size})" } + success end - ensure - @forwarding = false + ok || break end + ensure + @forwarding = false end - # @param [IO] io source stream - def write(io) + # @param [Pathname] file file to stream + def stream_file(file) pool.any? do |backend| - forward(backend, io) + stream_to(backend, file) end end private @@ -52,36 +52,33 @@ return if File.size(file) < 1 target = Pathname.new(file.sub(/\.closed$/, ".reserved")) FileUtils.mv file, target.to_s - result = false - target.open("r") do |io| - result = yield(io) - end - - if result + success = yield(target) + if success target.unlink else - logger.error "Flushing of #{target} failed." - FileUtils.mv target.to_s, target.to_s.sub(/\.reserved$/, ".closed") + logger.error "Flushing #{File.basename(file)} failed" + FileUtils.mv target.to_s, file end - result + success rescue Errno::ENOENT => e # Ignore if file was alread flushed by another process - logger.warn "Flushing of #{File.basename(file)} postponed: #{e.message}" + logger.warn "Flushing #{File.basename(file)} postponed: #{e.message}" end - def forward(backend, io) - io.rewind - until io.eof? - backend.write(io.read(CHUNK_SIZE)) - end + def stream_to(backend, file) + backend.stream(file) true rescue *RESCUABLE => e logger.error "Backend #{backend} failed: #{e.class.name} #{e.message}" backend.close false + end + + def closed_files + Dir[root.join("#{prefix}.*.closed")].sort end end \ No newline at end of file