lib/namespace.rb in inbox-0.16.1 vs lib/namespace.rb in inbox-0.17.0
- old
+ new
@@ -5,10 +5,13 @@
require 'draft'
require 'contact'
require 'file'
require 'calendar'
require 'event'
+require 'yajl'
+require 'em-http'
+require 'ostruct'
# Rather than saying require 'thread', we need to explicitly force
# the thread model to load. Otherwise, we can't reference it below.
# Thread still refers to the built-in Thread type, and Inbox::Thread
# is undefined.
@@ -85,28 +88,32 @@
"message" => Inbox::Message,
"namespace" => Inbox::Namespace,
"tag" => Inbox::Tag,
}
+ def _build_exclude_types(exclude_types)
+ exclude_string = "&exclude_types="
+
+ exclude_types.each do |value|
+ count = 0
+ if OBJECTS_TABLE.has_value?(value)
+ param_name = OBJECTS_TABLE.key(value)
+ exclude_string += "#{param_name},"
+ end
+ end
+
+ exclude_string = exclude_string[0..-2]
+ end
+
def deltas(cursor, exclude_types=[])
raise 'Please provide a block for receiving the delta objects' if !block_given?
exclude_string = ""
if exclude_types.any?
- exclude_string = "&exclude_types="
-
- exclude_types.each do |value|
- count = 0
- if OBJECTS_TABLE.has_value?(value)
- param_name = OBJECTS_TABLE.key(value)
- exclude_string += "#{param_name},"
- end
- end
+ exclude_string = _build_exclude_types(exclude_types)
end
- exclude_string = exclude_string[0..-2]
-
# loop and yield deltas until we've come to the end.
loop do
path = @_api.url_for_path("/n/#{@namespace_id}/delta?cursor=#{cursor}#{exclude_string}")
json = nil
@@ -138,9 +145,51 @@
end
end
break if start_cursor == end_cursor
cursor = end_cursor
+ end
+ end
+
+ def delta_stream(cursor, exclude_types=[], timeout=0)
+ raise 'Please provide a block for receiving the delta objects' if !block_given?
+
+ exclude_string = ""
+
+ if exclude_types.any?
+ exclude_string = _build_exclude_types(exclude_types)
+ end
+
+ # loop and yield deltas indefinitely.
+ path = @_api.url_for_path("/n/#{@namespace_id}/delta/streaming?cursor=#{cursor}#{exclude_string}")
+
+ parser = Yajl::Parser.new(:symbolize_keys => false)
+ parser.on_parse_complete = proc do |data|
+ delta = Inbox.interpret_response(OpenStruct.new(:code => '200'), data, {:expected_class => Object, :result_parsed => true})
+
+ cls = OBJECTS_TABLE[delta['object']]
+ obj = cls.new(@_api, @namespace_id)
+
+ case delta["event"]
+ when 'create', 'modify'
+ obj.inflate(delta['attributes'])
+ obj.cursor = delta["cursor"]
+ yield delta["event"], obj
+ when 'delete'
+ obj.id = delta["id"]
+ obj.cursor = delta["cursor"]
+ yield delta["event"], obj
+ end
+ end
+
+ EventMachine.run do
+ http = EventMachine::HttpRequest.new(path, :connect_timeout => 0, :inactivity_timeout => timeout).get(:keepalive => true)
+ http.stream do |chunk|
+ parser << chunk
+ end
+ http.errback do
+ raise UnexpectedResponse.new http.error
+ end
end
end
end
end