Sha256: b36e6af7bd74d22940c102e1a7d7f264131b9675d94a1fe0e4b912aec862c30b
Contents?: true
Size: 1.55 KB
Versions: 1
Compression:
Stored size: 1.55 KB
Contents
class Fwd::Output extend Forwardable def_delegators :core, :logger, :root, :prefix RESCUABLE = [ Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EPIPE, Errno::ENETUNREACH, Errno::ENETDOWN, Errno::EINVAL, Errno::ETIMEDOUT, IOError, EOFError ].freeze attr_reader :pool, :core # Constructor # @param [Fwd] core def initialize(core) backends = Array(core.opts[:forward]).compact.map do |s| Fwd::Backend.new(s) end @core = core @pool = Fwd::Pool.new(backends) end # Callback def forward! Dir[root.join("#{prefix}.*.closed")].each do |file| reserve(file) do |data| logger.debug { "Flushing #{File.basename(file)}, #{data.size.fdiv(1024).round} kB" } write(data) end end end # @param [String] binary data def write(data) pool.any? do |backend| forward(backend, data) end end private def reserve(file) return if File.size(file) < 1 target = Pathname.new(file.sub(/\.closed$/, ".reserved")) FileUtils.mv file, target.to_s if yield(target.read) target.unlink else logger.error "Flushing of #{target} failed." FileUtils.mv target.to_s, target.to_s.sub(/\.reserved$/, ".closed") end rescue Errno::ENOENT # Ignore if file was alread flushed by another process end def forward(backend, data) backend.write(data) && true rescue *RESCUABLE => e logger.error "Backend #{backend} failed: #{e.class.name} #{e.message}" backend.close false end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fwd-0.3.0 | lib/fwd/output.rb |