# -*- encoding: binary -*- # # Use this to wrap and replace your input object for spraying to multiple # servers. class HTTP_Spew::InputSpray attr_reader :readers class NoWritersError < HTTP_Spew::Error end class SizedPipe < HTTP_Spew::ChunkyPipe attr_accessor :size end def initialize(env, nr) @input = env["rack.input"] size = @input.respond_to?(:size) ? @input.size : env["CONTENT_LENGTH"] size = size ? size.to_i : nil klass = size ? SizedPipe : HTTP_Spew::ChunkyPipe @readers, @writers = [], [] nr.times do r, w = klass.new r.size = size if size @readers << r @writers << w end @wr = start_write_driver end def write_fail?(wr, buf) wr.kgio_write(buf) false rescue wr.close true end # TODO: splice(2) if @input is an IO def start_write_driver Thread.new do begin buf = "" while buf = @input.read(0x4000, buf) @writers.delete_if { |wr| write_fail?(wr, buf) }.empty? and raise NoWritersError, "all writers have died" end rescue => e @readers.each { |io| io.error = e } ensure @writers.each { |io| io.close unless io.closed? } end end end end