require "thread" module HTTP_Spew::ClassMethods def error_all(requests, error) # :nodoc: requests.each { |req| req.error ||= error } end def done_early(ready, failed, requests) # :nodoc: ready.concat(failed) pending = requests - ready unless pending.empty? error = HTTP_Spew::ConnectionReset.new("prematurely terminated") ready.concat(error_all(pending, error)) end ready.uniq! ready end # Returns an array of requests that are complete, including those # that have errored out. Incomplete requests remain in +requests+ # If +need+ is fullfilled, it closes all incomplete requests and # returns all requests. def wait_nonblock!(need, requests) ready, failed = [], [] requests.delete_if do |req| begin case req.resume when Symbol # :wait_writable, :wait_readable false else (ready << req).size == need and return done_early(ready, failed, requests) true end rescue => e req.error = e failed << req end end ready.concat(failed).empty? ? nil : ready end def with_timeout(t) t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) yield ensure t[0] -= Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 t[0] = 0.0 if t[0] < 0 end # Returns an array of requests that are complete, including those # that have errored out. # If +need+ is fullfilled, it closes all incomplete requests. def wait_mt(need, requests, timeout) ready, failed = [], [] r, w = Kgio::Pipe.new active = [] t = [ timeout ] requests.each_with_index do |req, i| active << Thread.new do begin rv = req.run(timeout) w.write([ i ].pack("v".freeze)) rv rescue => err err end end end begin with_timeout(t) { r.kgio_wait_readable(t[0]) } req_idx = r.read(2).unpack("v".freeze)[0] thr = active[req_idx] with_timeout(t) { thr.join(t[0]) } rv = thr.value (Array === rv ? ready : failed) << requests[req_idx] ready.size == need and return done_early(ready, failed, requests) end until t[0] < 0.0 || (ready.size + failed.size) == requests.size ready.concat(failed) pending = requests - ready error = HTTP_Spew::TimeoutError.new("request timed out") ready.concat(error_all(pending, error)) ensure w.close r.close end def wait(need, requests, timeout) ready, failed = [], [] pollset = {} t = [ timeout ] begin requests.each do |req| begin case rv = req.resume when Symbol # :wait_writable, :wait_readable pollset[req] = rv else (ready << req).size == need and return done_early(ready, failed, requests) pollset.delete(req) end rescue => e req.error = e failed << req pollset.delete(req) end end break if pollset.empty? busy = pollset.keys rv = with_timeout(t) { Kgio.poll(pollset, (t[0] * 1000).to_i) } or break end while t[0] > 0.0 && requests = rv.keys.concat(busy).uniq! ready.concat(failed) unless requests.empty? error = HTTP_Spew::TimeoutError.new("request timed out") ready.concat(error_all(requests, error)) ready.uniq! end ready end end