lib/servent/event_source.rb in servent-0.0.1 vs lib/servent/event_source.rb in servent-0.1.0

- old
+ new

@@ -2,12 +2,14 @@ require "servent/event" require "net/http" module Servent class EventSource - attr_reader :ready_state + DEFAULT_HEADERS = { "Accept" => "text/event-stream" } + attr_reader :ready_state, :uri + def initialize(url, net_http_options: { read_timeout: 600 }) @uri = URI(url) @net_http_options = net_http_options @ready_state = Servent::CONNECTING @@ -18,22 +20,33 @@ @proxy_config = ProxyConfig.new yield @proxy_config if block_given? end def start(http_starter = Net::HTTP) + @http_starter ||= http_starter params = HTTPStartParams.new(@uri, @proxy_config, @net_http_options) - Thread.new { - http_starter.start(*params.parameterize) do |http| + @thread = Thread.new { + @http_starter.start(*params.parameterize) do |http| get = Net::HTTP::Get.new @uri - headers.each { |header, value| get[header] = value } + DEFAULT_HEADERS.each { |header, value| get[header] = value } yield http, get if block_given? + perform_request http, get end } end + def listen(http_starter = Net::HTTP) + start(http_starter).join + end + + def close + @ready_state = Servent::CLOSED + @thread.kill unless @thread.nil? + end + def on_open(&open_block) @open_blocks << open_block end def on_message(&message_block) @@ -44,32 +57,56 @@ @error_blocks << error_block end private - def headers - { "Accept" => "text/event-stream" } - end - def perform_request(http, type) http.request type do |response| - # FIXME: response CAN have more than one mime type - unless response["Content-Type"] == "text/event-stream" - @ready_state = Servent::CLOSED - @error_blocks.each { |block| block.call response, :wrong_mime_type } - return - end + return fail_connection response if should_fail? response + return schedule_reconnection if should_reconnect? response + store_new_parmanent_url response - handle_response response + open_connection response end end - def handle_response(response) + def open_connection(response) @ready_state = Servent::OPEN @open_blocks.each { |block| block.call response } response.read_body do |chunk| - @message_blocks.each { |block| block.call chunk } + # FIXME: use the same stream object to parse + # different chunks. + stream = Stream.new chunk + events = stream.parse + events.each do |event| + @message_blocks.each { |block| block.call event } + end end + end + + def should_fail?(response) + return false if Servent::REDIRECT_STATUSES.include?(response.code.to_i) + (response["Content-Type"] != "text/event-stream") || + !Servent::KNOWN_STATUSES.include?(response.code.to_i) + end + + def fail_connection(response) + @ready_state = Servent::CLOSED + @error_blocks.each { |block| block.call response, :wrong_mime_type } + end + + def should_reconnect?(response) + Servent::RECONNECTION_STATUSES.include? response.code.to_i + end + + def schedule_reconnection + start + end + + def store_new_parmanent_url(response) + return unless response.code.to_i == 301 + @original_uri = @uri + @uri = URI(response["Location"]) end end class ProxyConfig attr_accessor :host, :user, :pass