Sha256: 2cac28b477abf00bae05bcf98258bedebd5ed54e0af5925ff2c677a9216d5290
Contents?: true
Size: 1.61 KB
Versions: 2
Compression:
Stored size: 1.61 KB
Contents
class Fwd::Output extend Forwardable def_delegators :core, :logger, :root, :prefix RESCUABLE = [ Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EPIPE, Errno::ENETUNREACH, Errno::ENETDOWN, Errno::EINVAL, Errno::ETIMEDOUT, IOError, EOFError ].freeze attr_reader :pool, :core # Constructor # @param [Fwd] core def initialize(core) backends = Array(core.opts[:forward]).compact.map do |s| Fwd::Backend.new(s) end @core = core @pool = Fwd::Pool.new(backends) end # Callback def forward! Dir[root.join("#{prefix}.*.closed")].each do |file| ok = reserve(file) do |data| logger.debug { "Flushing #{File.basename(file)}, #{data.size.fdiv(1024).round} kB" } write(data) end break unless ok end end # @param [String] binary data def write(data) pool.any? do |backend| forward(backend, data) end end private def reserve(file) return if File.size(file) < 1 target = Pathname.new(file.sub(/\.closed$/, ".reserved")) FileUtils.mv file, target.to_s result = yield(target.read) if result target.unlink else logger.error "Flushing of #{target} failed." FileUtils.mv target.to_s, target.to_s.sub(/\.reserved$/, ".closed") end result rescue Errno::ENOENT # Ignore if file was alread flushed by another process end def forward(backend, data) backend.write(data) && true rescue *RESCUABLE => e logger.error "Backend #{backend} failed: #{e.class.name} #{e.message}" backend.close false end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fwd-0.3.2 | lib/fwd/output.rb |
fwd-0.3.1 | lib/fwd/output.rb |