lib/namespace.rb in inbox-0.16.1 vs lib/namespace.rb in inbox-0.17.0

- old
+ new

@@ -5,10 +5,13 @@ require 'draft' require 'contact' require 'file' require 'calendar' require 'event' +require 'yajl' +require 'em-http' +require 'ostruct' # Rather than saying require 'thread', we need to explicitly force # the thread model to load. Otherwise, we can't reference it below. # Thread still refers to the built-in Thread type, and Inbox::Thread # is undefined. @@ -85,28 +88,32 @@ "message" => Inbox::Message, "namespace" => Inbox::Namespace, "tag" => Inbox::Tag, } + def _build_exclude_types(exclude_types) + exclude_string = "&exclude_types=" + + exclude_types.each do |value| + count = 0 + if OBJECTS_TABLE.has_value?(value) + param_name = OBJECTS_TABLE.key(value) + exclude_string += "#{param_name}," + end + end + + exclude_string = exclude_string[0..-2] + end + def deltas(cursor, exclude_types=[]) raise 'Please provide a block for receiving the delta objects' if !block_given? exclude_string = "" if exclude_types.any? - exclude_string = "&exclude_types=" - - exclude_types.each do |value| - count = 0 - if OBJECTS_TABLE.has_value?(value) - param_name = OBJECTS_TABLE.key(value) - exclude_string += "#{param_name}," - end - end + exclude_string = _build_exclude_types(exclude_types) end - exclude_string = exclude_string[0..-2] - # loop and yield deltas until we've come to the end. loop do path = @_api.url_for_path("/n/#{@namespace_id}/delta?cursor=#{cursor}#{exclude_string}") json = nil @@ -138,9 +145,51 @@ end end break if start_cursor == end_cursor cursor = end_cursor + end + end + + def delta_stream(cursor, exclude_types=[], timeout=0) + raise 'Please provide a block for receiving the delta objects' if !block_given? + + exclude_string = "" + + if exclude_types.any? + exclude_string = _build_exclude_types(exclude_types) + end + + # loop and yield deltas indefinitely. + path = @_api.url_for_path("/n/#{@namespace_id}/delta/streaming?cursor=#{cursor}#{exclude_string}") + + parser = Yajl::Parser.new(:symbolize_keys => false) + parser.on_parse_complete = proc do |data| + delta = Inbox.interpret_response(OpenStruct.new(:code => '200'), data, {:expected_class => Object, :result_parsed => true}) + + cls = OBJECTS_TABLE[delta['object']] + obj = cls.new(@_api, @namespace_id) + + case delta["event"] + when 'create', 'modify' + obj.inflate(delta['attributes']) + obj.cursor = delta["cursor"] + yield delta["event"], obj + when 'delete' + obj.id = delta["id"] + obj.cursor = delta["cursor"] + yield delta["event"], obj + end + end + + EventMachine.run do + http = EventMachine::HttpRequest.new(path, :connect_timeout => 0, :inactivity_timeout => timeout).get(:keepalive => true) + http.stream do |chunk| + parser << chunk + end + http.errback do + raise UnexpectedResponse.new http.error + end end end end end