# -*- encoding: binary -*- require "kgio" require "kcar" module HTTP_Spew autoload :ChunkyPipe, "http_spew/chunky_pipe" autoload :ContentMD5, "http_spew/content_md5" autoload :HitNRun, "http_spew/hit_n_run" autoload :InputSpray, "http_spew/input_spray" class Error < RuntimeError; end class TimeoutError < Error; end class ConnectionReset < Error; end def self.error_all(requests, error) # :nodoc: requests.each { |req| req.error ||= error } end def self.done_early(ready, failed, requests) # :nodoc: ready.concat(failed) pending = requests - ready unless pending.empty? error = ConnectionReset.new("prematurely terminated") ready.concat(error_all(pending, error)) end ready.uniq! ready end # Returns an array of requests that are complete, including those # that have errored out. Incomplete requests remain in +requests+ # If +need+ is fullfilled, it closes all incomplete requests and # returns all requests. def self.wait_nonblock!(need, requests) ready, failed = [], [] requests.delete_if do |req| begin case req.resume when Symbol # :wait_writable, :wait_readable false else (ready << req).size == need and return done_early(ready, failed, requests) true end rescue => e req.error = e failed << req end end ready.concat(failed).empty? ? nil : ready end # Returns an array of requests that are complete, including those # that have errored out. # If +need+ is fullfilled, it closes all incomplete requests. def self.wait(need, requests, timeout) ready, failed = [], [] pollset = {} begin requests.each do |req| begin case rv = req.resume when Symbol # :wait_writable, :wait_readable pollset[req] = rv else (ready << req).size == need and return done_early(ready, failed, requests) pollset.delete(req) end rescue => e req.error = e failed << req pollset.delete(req) end end break if pollset.empty? t0 = Time.now busy = pollset.keys rv = Kgio.poll(pollset, timeout.to_i) or break timeout -= (Time.now - t0) * 1000 rescue Errno::EINTR timeout -= (Time.now - t0) * 1000 retry end while timeout > 0.0 && requests = rv.keys.concat(busy).uniq! ready.concat(failed) unless requests.empty? ready.concat(error_all(requests, TimeoutError.new("request timed out"))) ready.uniq! end ready end end require "http_spew/headers" require "http_spew/request"