lib/seapig/client.rb in seapig-client-0.0.4 vs lib/seapig/client.rb in seapig-client-0.0.5
- old
+ new
@@ -43,11 +43,10 @@
@slave_objects.each_pair { |object_id, object|
@socket.send JSON.dump(action: 'object-consumer-register', id: object_id, latest_known_version: object.version)
}
@master_objects.each_pair { |object_id, object|
@socket.send JSON.dump(action: 'object-producer-register', pattern: object_id)
- object.upload(0, {})
}
@last_communication_at = Time.new.to_f
}
@socket.onmessage { |message|
@@ -61,46 +60,56 @@
when 'object-destroy'
@slave_objects.values.each { |object|
object.destroy(message) if object.matches?(message['id'])
}
when 'object-produce'
+ @master_objects[message['id']].onproduce_proc.call(message['id']) if @master_objects[message['id']].onproduce_proc
@master_objects[message['id']].upload(0,{}) if @master_objects[message['id']]
else
p :wtf, message
end
@last_communication_at = Time.new.to_f
}
@socket.onclose { |code, reason|
- puts 'Seapig connection died unexpectedly, reconnecting'
+ puts 'Seapig connection died unexpectedly (code:'+code.inspect+', reason:'+reason.inspect+'), reconnecting in 1s'
+ sleep 1
connect
}
+ @socket.onerror { |error|
+ puts 'Seapig error: '+error.inspect
+ }
+
@socket.onping {
@last_communication_at = Time.new.to_f
}
end
- def disconnect
+
+ def disconnect(detach_fd: false)
@connected = false
if @timeout_timer
@timeout_timer.cancel
@timeout_timer = nil
end
if @socket
@socket.onclose {}
- @socket.close
+ if detach_fd
+ IO.new(@socket.detach).close
+ else
+ @socket.close
+ end
@socket = nil
end
end
def detach_fd
- @connected = false
- IO.new(@socket.detach).close
+ disconnect(true)
end
def slave(object_id)
object = if object_id.include?('*') then SeapigWildcardObject.new(self, object_id) else SeapigObject.new(self, object_id) end
@@ -118,11 +127,11 @@
end
class SeapigObject < Hash
- attr_accessor :version, :object_id, :valid
+ attr_accessor :version, :object_id, :valid, :onproduce_proc, :stall
def matches?(id)
id =~ Regexp.new(Regexp.escape(@object_id).gsub('\*','.*?'))
end
@@ -130,20 +139,27 @@
def initialize(server, object_id)
@server = server
@object_id = object_id
@version = 0
- @onchange = nil
+ @onchange_proc = nil
+ @onproduce_proc = nil
@valid = false
@shadow = JSON.load(JSON.dump(self))
+ @stall = false
end
def onchange(&block)
- @onchange = block
+ @onchange_proc = block
end
+
+ def onproduce(&block)
+ @onproduce_proc = block
+ end
+
def patch(message)
if not message['old_version']
self.clear
elsif message['old_version'] == 0
@@ -154,14 +170,27 @@
exit 2
end
Hana::Patch.new(message['patch']).apply(self)
@version = message['new_version']
@valid = true
- @onchange.call(self) if @onchange
+ @onchange_proc.call(self) if @onchange_proc
end
+ def set(data, version)
+ if data
+ @stall = false
+ self.clear
+ self.merge!(data)
+ @shadow = sanitized
+ else
+ @stall = true
+ end
+ @version = version
+ end
+
+
def changed
old_version = @version
old_object = @shadow
@version += 1
@shadow = sanitized
@@ -178,11 +207,15 @@
message = {
id: @object_id,
action: 'object-patch',
old_version: old_version,
new_version: @version,
- patch: JsonDiff.generate(old_object, @shadow)
}
+ if old_version == 0 or @stall
+ message.merge!(value: (if @stall then false else @shadow end))
+ else
+ message.merge!(patch: JsonDiff.generate(old_object, @shadow))
+ end
@server.socket.send JSON.dump(message)
end