module Muxer # Multiplexer is the core class of Muxer that actually multiplexes # web requests. Multiplexer has a lists of Muxer::Requests that will # be executed and added to the completed or failed response when the # timeouts have been reached. # # @!attribute requests # @return [Array] Muxer::Requests that are setup in this Multiplexer # @!attribute timeout # @return [Number] Seconds for the timeout class Multiplexer attr_reader :requests attr_writer :timeout # multiplexer = Multiplexer.new def initialize @requests = [] @timeout = nil end # add_url builds a Request object and passes it to add_request # # m.add_url('https://www.google.com', {timeout: 3}) # gives a 3 second # timeout to a request to https://www.google.com # # url is merely the target URL # # `options` is a hash describing the web request # { # timeout: nil, # method: :get, # params: {}, # redirects: nil # } # # @param url [string] The URL for the web request # @param options [{symbol => Object}] The parameters for the web request # @return true def add_url(url, options = {}) options.keys.each do |key| options[key.to_sym] = options.delete(key) end options = {timeout: nil, method: :get, params: {}, redirects: nil, id: nil}.merge(options) timeout = request = Request.new request.url = url options.each do |key, val| next unless request.respond_to? ("#{key}=".to_sym) request.send("#{key}=".to_sym, val) if val end add_request request true end # add_request adds a request to Multiplexer # # request = Muxer::Request.new # request.url = 'https://www.google.com' # request.timeout = 3 # m.add_request request # # gives a 3 second timeout to a request to https://www.google.com # # @param request [Muxer::Request] the Request to add to the multiplexer # @return true def add_request(request) requests << request true end # executes the actual event loop that manages creating, sending, # and processing the finished / timed out web requests # # @return [Hash] Keys are :succeeded, :failed def execute @responses = {succeeded: [], failed: [], pending: [], succeeded_by_id: {}} @start = Time.now EventMachine.run do requests.each do |request| @responses[:pending] << request.process! end EM::PeriodicTimer.new(0.001) do process_requests end end @responses end private def process_requests process_pending process_timeouts if @responses[:pending].empty? EM.stop end end def process_pending @responses[:pending].each do |pending| if pending.completed? @responses[:pending].delete(pending) if pending.error.nil? @responses[:succeeded] << pending if pending.id @responses[:succeeded_by_id][pending.id] = pending end else @responses[:failed] << pending end end end end def process_timeouts if @timeout && (@start + @timeout <= Time.now) finish_timeouts return end highest_remaining_timeout = @responses[:pending].map(&:timeout).max if highest_remaining_timeout && (@start + highest_remaining_timeout <= Time.now) finish_timeouts end end def finish_timeouts @responses[:pending].each do |pending| @responses[:failed] << pending end @responses[:pending] = [] EM.stop end end end