Sha256: 58e529243a05df95c480865bfcc550ab911c7059c447bacfe661ce33f57c02f2

Contents?: true

Size: 1.62 KB

Versions: 2

Compression:

Stored size: 1.62 KB

Contents

module Muxer
  class Multiplexer
    attr_reader :requests
    attr_writer :timeout
    def initialize
      @requests = []
      @timeout = nil
    end

    def add_url(url, timeout = nil)
      request = Request.new
      request.url = url
      request.timeout = timeout if timeout
      requests << request
    end

    def execute
      @responses = {succeeded: [], failed: [], pending: []}
      @start = Time.now
      EventMachine.run do
        requests.each do |request|
          @responses[:pending] << request.process!
        end

        EM::PeriodicTimer.new(0.001) do
          process_requests
        end
      end
      @responses
    end

    private

    def process_requests
      process_pending
      
      process_timeouts

      if @responses[:pending].empty?
        EM.stop
      end
    end

    def process_pending
      @responses[:pending].each do |pending|
        if pending.completed?
          @responses[:pending].delete(pending)
          if pending.error.nil?
            @responses[:succeeded] << pending
          else
            @responses[:failed] << pending
          end
        end
      end
    end

    def process_timeouts
      if @timeout && (@start + @timeout <= Time.now)
        finish_timeouts
        return
      end
      highest_remaining_timeout = @responses[:pending].map(&:timeout).max
      if highest_remaining_timeout && (@start + highest_remaining_timeout <= Time.now)
        finish_timeouts
      end
    end

    def finish_timeouts
      @responses[:pending].each do |pending|
        @responses[:failed] << pending
      end
      @responses[:pending] = []
      EM.stop
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
muxer-0.1.3 lib/muxer/multiplexer.rb
muxer-0.1.2 lib/muxer/multiplexer.rb