Sha256: 45909f977b7a84dd642a41921d0b36f1757bee62f4a25c93a7673bc3eb2a680d

Contents?: true

Size: 1.57 KB

Versions: 1

Compression:

Stored size: 1.57 KB

Contents

# frozen_string_literal: true

require "httpx/selector"
require "httpx/channel"

module HTTPX
  class Connection
    def initialize(options)
      @options = Options.new(options)
      @timeout = options.timeout
      @selector = Selector.new
      @channels = []
    end

    def running?
      !@channels.empty?
    end

    def next_tick(timeout: @timeout.timeout)
      @selector.select(timeout) do |monitor|
        if (channel = monitor.value)
          consume(channel)
        end
        monitor.interests = channel.interests
      end
    end

    def close(channel = nil)
      if channel
        channel.close
      else
        @channels.each(&:close)
        next_tick until @selector.empty?
      end
    end

    def reset
      @channels.each(&:reset)
    end

    def build_channel(uri, **options)
      channel = Channel.by(uri, @options.merge(options))
      register_channel(channel)
      channel
    end

    # opens a channel to the IP reachable through +uri+.
    # Many hostnames are reachable through the same IP, so we try to
    # maximize pipelining by opening as few channels as possible.
    #
    def find_channel(uri)
      @channels.find do |channel|
        channel.match?(uri)
      end
    end

    private

    def register_channel(channel)
      monitor = @selector.register(channel, :w)
      monitor.value = channel
      channel.on(:close) do
        @channels.delete(channel)
        @selector.deregister(channel)
      end
      @channels << channel
    end

    def consume(channel)
      ch = catch(:close) { channel.call }
      close(ch) if ch
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
httpx-0.0.1 lib/httpx/connection.rb