lib/seapig/client.rb in seapig-client-0.0.4 vs lib/seapig/client.rb in seapig-client-0.0.5

- old
+ new

@@ -43,11 +43,10 @@ @slave_objects.each_pair { |object_id, object| @socket.send JSON.dump(action: 'object-consumer-register', id: object_id, latest_known_version: object.version) } @master_objects.each_pair { |object_id, object| @socket.send JSON.dump(action: 'object-producer-register', pattern: object_id) - object.upload(0, {}) } @last_communication_at = Time.new.to_f } @socket.onmessage { |message| @@ -61,46 +60,56 @@ when 'object-destroy' @slave_objects.values.each { |object| object.destroy(message) if object.matches?(message['id']) } when 'object-produce' + @master_objects[message['id']].onproduce_proc.call(message['id']) if @master_objects[message['id']].onproduce_proc @master_objects[message['id']].upload(0,{}) if @master_objects[message['id']] else p :wtf, message end @last_communication_at = Time.new.to_f } @socket.onclose { |code, reason| - puts 'Seapig connection died unexpectedly, reconnecting' + puts 'Seapig connection died unexpectedly (code:'+code.inspect+', reason:'+reason.inspect+'), reconnecting in 1s' + sleep 1 connect } + @socket.onerror { |error| + puts 'Seapig error: '+error.inspect + } + @socket.onping { @last_communication_at = Time.new.to_f } end - def disconnect + + def disconnect(detach_fd: false) @connected = false if @timeout_timer @timeout_timer.cancel @timeout_timer = nil end if @socket @socket.onclose {} - @socket.close + if detach_fd + IO.new(@socket.detach).close + else + @socket.close + end @socket = nil end end def detach_fd - @connected = false - IO.new(@socket.detach).close + disconnect(true) end def slave(object_id) object = if object_id.include?('*') then SeapigWildcardObject.new(self, object_id) else SeapigObject.new(self, object_id) end @@ -118,11 +127,11 @@ end class SeapigObject < Hash - attr_accessor :version, :object_id, :valid + attr_accessor :version, :object_id, :valid, :onproduce_proc, :stall def matches?(id) id =~ Regexp.new(Regexp.escape(@object_id).gsub('\*','.*?')) end @@ -130,20 +139,27 @@ def initialize(server, object_id) @server = server @object_id = object_id @version = 0 - @onchange = nil + @onchange_proc = nil + @onproduce_proc = nil @valid = false @shadow = JSON.load(JSON.dump(self)) + @stall = false end def onchange(&block) - @onchange = block + @onchange_proc = block end + + def onproduce(&block) + @onproduce_proc = block + end + def patch(message) if not message['old_version'] self.clear elsif message['old_version'] == 0 @@ -154,14 +170,27 @@ exit 2 end Hana::Patch.new(message['patch']).apply(self) @version = message['new_version'] @valid = true - @onchange.call(self) if @onchange + @onchange_proc.call(self) if @onchange_proc end + def set(data, version) + if data + @stall = false + self.clear + self.merge!(data) + @shadow = sanitized + else + @stall = true + end + @version = version + end + + def changed old_version = @version old_object = @shadow @version += 1 @shadow = sanitized @@ -178,11 +207,15 @@ message = { id: @object_id, action: 'object-patch', old_version: old_version, new_version: @version, - patch: JsonDiff.generate(old_object, @shadow) } + if old_version == 0 or @stall + message.merge!(value: (if @stall then false else @shadow end)) + else + message.merge!(patch: JsonDiff.generate(old_object, @shadow)) + end @server.socket.send JSON.dump(message) end