require 'eventmachine' require 'em/buftok' module Twitter class JSONStream < EventMachine::Connection MAX_LINE_LENGTH = 16*1024 # network failure reconnections NF_RECONNECT_START = 0.25 NF_RECONNECT_ADD = 0.25 NF_RECONNECT_MAX = 16 # app failure reconnections AF_RECONNECT_START = 10 AF_RECONNECT_MUL = 2 RECONNECT_MAX = 320 RETRIES_MAX = 10 DEFAULT_OPTIONS = { :method => 'GET', :path => '/', :content_type => "application/x-www-form-urlencoded", :content => '', :path => '/1/statuses/filter.json', :host => 'stream.twitter.com', :port => 80, :auth => 'test:test', :user_agent => 'TwitterStream', } attr_accessor :code attr_accessor :headers attr_accessor :nf_last_reconnect attr_accessor :af_last_reconnect attr_accessor :reconnect_retries def self.connect options = {} options = DEFAULT_OPTIONS.merge(options) EventMachine.connect options[:host], options[:port], self, options end def initialize options = {} @options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly @gracefully_closed = false @nf_last_reconnect = nil @af_last_reconnect = nil @reconnect_retries = 0 end def each_item &block @each_item_callback = block end def on_error &block @error_callback = block end def on_reconnect &block @reconnect_callback = block end def on_max_reconnects &block @max_reconnects_callback = block end def stop @gracefully_closed = true close_connection end def unbind receive_line(@buffer.flush) unless @buffer.empty? schedule_reconnect unless @gracefully_closed end def receive_data data begin @buffer.extract(data).each do |line| receive_line(line) end rescue Exception => e receive_error(e.message) close_connection return end end def connection_completed reset_state send_request end protected def schedule_reconnect timeout = reconnect_timeout @reconnect_retries += 1 if timeout <= RECONNECT_MAX && @reconnect_retries <= RETRIES_MAX reconnect_after(timeout) else @max_reconnects_callback.call(timeout, @reconnect_retries) if @max_reconnects_callback end end def reconnect_after timeout @reconnect_callback.call(timeout, @reconnect_retries) if @reconnect_callback EventMachine.add_timer(timeout) do reconnect @options[:host], @options[:port] end end def reconnect_timeout if (@code == 0) # network failure if @nf_last_reconnect @nf_last_reconnect += NF_RECONNECT_ADD else @nf_last_reconnect = NF_RECONNECT_START end [@nf_last_reconnect,NF_RECONNECT_MAX].min else if @af_last_reconnect @af_last_reconnect *= AF_RECONNECT_MUL else @af_last_reconnect = AF_RECONNECT_START end @af_last_reconnect end end def reset_state @code = 0 @headers = [] @state = :init @buffer = BufferedTokenizer.new("\r", MAX_LINE_LENGTH) end def send_request data = [] data << "#{@options[:method]} #{@options[:path]} HTTP/1.1" data << "Host: #{@options[:host]}" data << "User-agent: #{@options[:user_agent]}" if @options[:user_agent] data << "Authorization: Basic " + [@options[:auth]].pack('m').delete("\r\n") if @options[:method] == 'POST' data << "Content-type: #{@options[:content_type]}" data << "Content-length: #{@options[:content].length}" end data << "\r\n" send_data data.join("\r\n") + @options[:content] end def receive_line ln case @state when :init parse_response_line ln when :headers parse_header_line ln when :stream parse_stream_line ln end end def receive_error e @error_callback.call(e) if @error_callback end def parse_stream_line ln ln.strip! unless ln.empty? if ln[0,1] == '{' @each_item_callback.call(ln) if @each_item_callback end end end def parse_header_line ln ln.strip! if ln.empty? reset_timeouts @state = :stream else headers << ln end end def parse_response_line ln if ln =~ /\AHTTP\/1\.[01] ([\d]{3})/ @code = $1.to_i @state = :headers receive_error("invalid status code: #{@code}. #{ln}") unless @code == 200 else receive_error('invalid response') close_connection end end def reset_timeouts @nf_last_reconnect = @af_last_reconnect = nil @reconnect_retries = 0 end end end