Sha256: 56c82d3943705d033a181d00fa3406dcdc644a82876e75cf3a53f296c2a7e307

Contents?: true

Size: 1001 Bytes

Versions: 5

Compression:

Stored size: 1001 Bytes

Contents

# -*- encoding: binary -*-
#
# Use this to wrap and replace your input object for spraying to multiple
# servers.
class HTTP_Spew::InputSpray
  def initialize(env, nr, input = env["rack.input"])
    @input = input
    @pipes = {}.compare_by_identity
    nr.times do
      r, w = HTTP_Spew::ChunkyPipe.new
      @pipes[r] = w
    end
    start_write_driver
  end

  def readers
    @pipes.keys
  end

  def write_fail?(rd, wr, buf)
    wr.write(buf)
    false
    rescue => e
      rd.error = e
      wr.close
      true
  end

  # TODO: splice(2) if @input is an IO
  def start_write_driver
    Thread.new do
      begin
        buf = ""
        while @input.read(0x4000, buf)
          @pipes.delete_if { |rd, wr| write_fail?(rd, wr, buf) }.empty? and
            raise HTTP_Spew::NoWritersError, "all writers have died", []
        end
      rescue => e
        @pipes.each { |rd, _| rd.error = e }
      ensure
        @pipes.each { |_, wr| wr.close unless wr.closed? }
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
http_spew-0.5.0 lib/http_spew/input_spray.rb
http_spew-0.4.1 lib/http_spew/input_spray.rb
http_spew-0.4.0 lib/http_spew/input_spray.rb
http_spew-0.3.0 lib/http_spew/input_spray.rb
http_spew-0.2.0 lib/http_spew/input_spray.rb