#!/bin/env ruby require 'websocket-eventmachine-server' require 'json' require 'jsondiff' require 'hana' require 'set' class String def starexp Regexp.new(Regexp.escape(self).gsub('\*','.*?')) end def starexp? self.include?('*') end end class SeapigObject attr_reader :id, :version, :valid @@objects_by_id = Hash.new { |hash,object_id| puts "Creating: "+(object = SeapigObject.new(object_id)).id hash[object_id] = object } def self.[](id) @@objects_by_id[id] end def self.all @@objects_by_id.values end def self.matching(id) @@objects_by_id.values.select { |object| object.matches?(id) } end def matches?(id) @id =~ id.starexp end def initialize(id) @id = id @valid = false @object = {} @version = 0 @@objects_by_id[@id] = self end def self.gc used_object_ids = Set.new Client.all.each { |client| used_object_ids.merge(client.consumes.map { |object| object.id }) } #objects with direct consuments (no pattern matching) @@objects_by_id.values.each { |object| used_object_ids << object.id if Client.all.find { |client| client.produces.include?(object.id) } and Client.all.find { |client| client.consumes.find { |consumed| consumed.id.starexp? and object.id =~ consumed.id.starexp } } } #objects having producers AND wildcard consuments @@objects_by_id.values.each { |object| object.version.keys.each { |key| used_object_ids << key } if object.version.kind_of?(Hash) } # objects that others depend on @@objects_by_id.keys.select { |id| not used_object_ids.include?(id) }.each { |id| Client.all.select { |client| client.consumes.find { |object| object.id.starexp? and id =~ object.id.starexp } }.each { |client| puts "Destroying: "+id client.socket.send JSON.dump(action: 'object-destroy', id: id) } puts "Deleting: "+id @@objects_by_id.delete(id) } end def patch(patch, value, from_version, new_version) print "Patching:\n\tversion:"+@version.inspect+"\n\tfrom_version: "+from_version.inspect+"\n\tnew_version: "+new_version.inspect+"\n\tpatch_size: "+(patch and patch.size.to_s or "nil")+"\n\t--> " if from_version == @version or from_version == 0 puts 'Clean' old_object = JSON.load(JSON.dump(@object)) old_version = @version @object.clear if from_version == 0 or value begin Hana::Patch.new(patch).apply(@object) if patch rescue Exception => e puts "Patching failed!\n\tOld object: "+old_object.inspect+"\n\tPatch: "+patch.inspect raise e end @object.merge!(value) if value @version = new_version Client.all.each { |client| send(client, old_version, old_object) } SeapigObject.all.each { |object| object.check_validity } elsif from_version > @version puts "Lost some updates, reinitializing object" @version = 0 @object.clear @valid = false else puts "Late update, ignoring" end end def send(client, old_version, old_object, patch = nil) return false if not client.consumes.find { |object| (object == self) or self.matches?(object.id) } puts 'Sending '+self.id client.socket.send JSON.dump( action: 'object-update', id: @id, old_version: old_version, new_version: @version, patch: (patch or JsonDiff.generate(old_object, @object))) end def check_validity @valid = if not @version.kind_of?(Hash) then (@version > 0) else not @version.to_a.find { |dependency_id, dependency_version| SeapigObject[dependency_id] and SeapigObject[dependency_id].version and SeapigObject[dependency_id].version > dependency_version } end end def inspect ''%[@id, @version, (@valid and 'V' or 'I')] end end class Client attr_reader :produces, :consumes, :socket, :producing, :index attr_accessor :options @@clients_by_socket = {} @@count = 0 def self.[](id) @@clients_by_socket[id] end def self.all @@clients_by_socket.values end def initialize(socket) @index = @@count += 1 puts 'Creating client: '+@index.to_s @socket = socket @options = {} @produces = [] @consumes = [] @producing = nil @@clients_by_socket[socket] = self end def id (@options['name'] or "") + ':' + @index.to_s end def destroy puts 'Destroying client: '+@index.to_s @@clients_by_socket.delete(@socket) SeapigObject.gc Client.all.find { |client| client.assign(@producing) } if @producing and SeapigObject.all.include?(@producing) end def producer_register(pattern) @produces.push(pattern) SeapigObject.all.each { |object| self.assign(object) } end def consumer_register(object) @consumes.push(object) Client.all.each { |client| client.produces.each { |pattern| SeapigObject[pattern] if (not pattern.starexp?) and (pattern =~ object.id.starexp) } } if object.id.starexp? SeapigObject.matching(object.id).each { |object| Client.all.find { |client| client.assign(object) } object.send(self,0,{}) if object.valid } end def consumer_unregister(object) @consumes.delete(object) SeapigObject.gc end def assign(object) puts 'Assign? %20s <> %-30s - %s'%[self.id, object.id, [object.valid,object.id.starexp?,@producing,Client.all.find { |client| client.producing == object },(not @produces.find { |pattern| object.id =~ pattern.starexp })].map { |b| b and 'T' or 'F' }.join('')] return true if object.valid return true if object.id.starexp? return false if @producing return true if Client.all.find { |client| client.producing == object } return false if not @produces.find { |pattern| object.id =~ pattern.starexp } puts 'Assigning: '+object.id+' to: '+self.id @socket.send JSON.dump(action: 'object-produce', id: object.id) @producing = object end def release(object) puts 'Releasing: '+object.id+' from: '+self.id @producing = nil if @producing == object end def ping @socket.ping end def pong @pong_time = Time.new end def check_ping_timeout @socket.close if Time.new - pong > 60 end end EM.run { WebSocket::EventMachine::Server.start(host: "0.0.0.0", port: 3001) { |client_socket| client_socket.onmessage { |message| client = Client[client_socket] message = JSON.load message puts "-"*80 + ' ' + Time.new.to_s + "\nMessage: %-20s %-30s %s:%s"%[client.id, message['action'], (message["id"] and "ID" or "Pattern"),(message["id"] or message["pattern"])] object = SeapigObject[message['id']] if message['id'] case message['action'] when 'object-producer-register' fail unless message['pattern'] client.producer_register(message['pattern']) when 'object-producer-unregister' fail unless message['pattern'] client.producer_unregister(message['pattern']) when 'object-patch' fail unless message['id'] and message['new_version'] and message['old_version'] SeapigObject.gc if SeapigObject.all.include?(object) # ignoring objects nobody listens to object.patch(message['patch'], message['value'], message['old_version'], message['new_version']) client.release(object) SeapigObject.all.each { |object| Client.all.find { |client| client.assign(object) } } end when 'object-consumer-register' fail unless message['id'] client.consumer_register(object) when 'object-consumer-unregister' fail unless message['id'] client.consumer_unregister(object) when 'client-options-set' fail unless message['options'] client.options = message['options'] else puts '***** WTF, got message with action: ' + message['action'].inspect end puts "Clients:\n"+Client.all.map { |client| "\t%-20s produces:%s consumes:%s"%[client.id,client.produces.inspect,client.consumes.map { |obj| obj.id }] }.join("\n")+"\n" puts "Objects:\n"+SeapigObject.all.map { |object| "\t%s"%[object.inspect] }.join("\n")+"\n" } client_socket.onopen { Client.new(client_socket) } client_socket.onclose { Client[client_socket].destroy if Client[client_socket] } client_socket.onpong { Client[client_socket].pong } } Socket.open(:UNIX, :DGRAM) { |s| s.connect(Socket.pack_sockaddr_un(ENV['NOTIFY_SOCKET'])); s.sendmsg "READY=1" } if ENV['NOTIFY_SOCKET'] EM.add_periodic_timer(10) { Client.all.each { |client| client.ping } } EM.add_periodic_timer(10) { Client.all.each { |client| client.check_ping_timeout } } }