lib/fwd/output.rb in fwd-0.3.3 vs lib/fwd/output.rb in fwd-0.4.0
- old
+ new
@@ -1,10 +1,9 @@
class Fwd::Output
extend Forwardable
def_delegators :core, :logger, :root, :prefix
- CHUNK_SIZE = 16 * 1024
RESCUABLE = [
Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EPIPE,
Errno::ENETUNREACH, Errno::ENETDOWN, Errno::EINVAL, Errno::ETIMEDOUT,
IOError, EOFError
].freeze
@@ -24,27 +23,28 @@
# Callback
def forward!
return if @forwarding
@forwarding = true
- begin
- Dir[root.join("#{prefix}.*.closed")].sort.each do |file|
- ok = reserve(file) do |io|
- logger.debug { "Flushing #{File.basename(io.path)}, #{io.size.fdiv(1024).round} kB" }
- write(io)
- end
- ok or break
+ while (q = closed_files) && (file = q.shift)
+ ok = reserve(file) do |reserved|
+ start = Time.now
+ success = stream_file(reserved)
+ real = Time.now - start
+ logger.info { "Flushed #{reserved.basename}, #{reserved.size.fdiv(1024).round}k in #{real.round(1)}s (Q: #{q.size})" }
+ success
end
- ensure
- @forwarding = false
+ ok || break
end
+ ensure
+ @forwarding = false
end
- # @param [IO] io source stream
- def write(io)
+ # @param [Pathname] file file to stream
+ def stream_file(file)
pool.any? do |backend|
- forward(backend, io)
+ stream_to(backend, file)
end
end
private
@@ -52,36 +52,33 @@
return if File.size(file) < 1
target = Pathname.new(file.sub(/\.closed$/, ".reserved"))
FileUtils.mv file, target.to_s
- result = false
- target.open("r") do |io|
- result = yield(io)
- end
-
- if result
+ success = yield(target)
+ if success
target.unlink
else
- logger.error "Flushing of #{target} failed."
- FileUtils.mv target.to_s, target.to_s.sub(/\.reserved$/, ".closed")
+ logger.error "Flushing #{File.basename(file)} failed"
+ FileUtils.mv target.to_s, file
end
- result
+ success
rescue Errno::ENOENT => e
# Ignore if file was alread flushed by another process
- logger.warn "Flushing of #{File.basename(file)} postponed: #{e.message}"
+ logger.warn "Flushing #{File.basename(file)} postponed: #{e.message}"
end
- def forward(backend, io)
- io.rewind
- until io.eof?
- backend.write(io.read(CHUNK_SIZE))
- end
+ def stream_to(backend, file)
+ backend.stream(file)
true
rescue *RESCUABLE => e
logger.error "Backend #{backend} failed: #{e.class.name} #{e.message}"
backend.close
false
+ end
+
+ def closed_files
+ Dir[root.join("#{prefix}.*.closed")].sort
end
end
\ No newline at end of file