lib/servent/event_source.rb in servent-0.0.1 vs lib/servent/event_source.rb in servent-0.1.0
- old
+ new
@@ -2,12 +2,14 @@
require "servent/event"
require "net/http"
module Servent
class EventSource
- attr_reader :ready_state
+ DEFAULT_HEADERS = { "Accept" => "text/event-stream" }
+ attr_reader :ready_state, :uri
+
def initialize(url, net_http_options: { read_timeout: 600 })
@uri = URI(url)
@net_http_options = net_http_options
@ready_state = Servent::CONNECTING
@@ -18,22 +20,33 @@
@proxy_config = ProxyConfig.new
yield @proxy_config if block_given?
end
def start(http_starter = Net::HTTP)
+ @http_starter ||= http_starter
params = HTTPStartParams.new(@uri, @proxy_config, @net_http_options)
- Thread.new {
- http_starter.start(*params.parameterize) do |http|
+ @thread = Thread.new {
+ @http_starter.start(*params.parameterize) do |http|
get = Net::HTTP::Get.new @uri
- headers.each { |header, value| get[header] = value }
+ DEFAULT_HEADERS.each { |header, value| get[header] = value }
yield http, get if block_given?
+
perform_request http, get
end
}
end
+ def listen(http_starter = Net::HTTP)
+ start(http_starter).join
+ end
+
+ def close
+ @ready_state = Servent::CLOSED
+ @thread.kill unless @thread.nil?
+ end
+
def on_open(&open_block)
@open_blocks << open_block
end
def on_message(&message_block)
@@ -44,32 +57,56 @@
@error_blocks << error_block
end
private
- def headers
- { "Accept" => "text/event-stream" }
- end
-
def perform_request(http, type)
http.request type do |response|
- # FIXME: response CAN have more than one mime type
- unless response["Content-Type"] == "text/event-stream"
- @ready_state = Servent::CLOSED
- @error_blocks.each { |block| block.call response, :wrong_mime_type }
- return
- end
+ return fail_connection response if should_fail? response
+ return schedule_reconnection if should_reconnect? response
+ store_new_parmanent_url response
- handle_response response
+ open_connection response
end
end
- def handle_response(response)
+ def open_connection(response)
@ready_state = Servent::OPEN
@open_blocks.each { |block| block.call response }
response.read_body do |chunk|
- @message_blocks.each { |block| block.call chunk }
+ # FIXME: use the same stream object to parse
+ # different chunks.
+ stream = Stream.new chunk
+ events = stream.parse
+ events.each do |event|
+ @message_blocks.each { |block| block.call event }
+ end
end
+ end
+
+ def should_fail?(response)
+ return false if Servent::REDIRECT_STATUSES.include?(response.code.to_i)
+ (response["Content-Type"] != "text/event-stream") ||
+ !Servent::KNOWN_STATUSES.include?(response.code.to_i)
+ end
+
+ def fail_connection(response)
+ @ready_state = Servent::CLOSED
+ @error_blocks.each { |block| block.call response, :wrong_mime_type }
+ end
+
+ def should_reconnect?(response)
+ Servent::RECONNECTION_STATUSES.include? response.code.to_i
+ end
+
+ def schedule_reconnection
+ start
+ end
+
+ def store_new_parmanent_url(response)
+ return unless response.code.to_i == 301
+ @original_uri = @uri
+ @uri = URI(response["Location"])
end
end
class ProxyConfig
attr_accessor :host, :user, :pass