# This file is part of CPEE. # # CPEE is free software: you can redistribute it and/or modify it under the terms # of the GNU General Public License as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # # CPEE is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A # PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # CPEE (file COPYING in the main directory). If not, see # . require 'json' require 'pp' require ::File.dirname(__FILE__) + '/handler_properties' require ::File.dirname(__FILE__) + '/handler_notifications' require ::File.dirname(__FILE__) + '/callback' require ::File.dirname(__FILE__) + '/empty_workflow' module CPEE class ValueHelper def self::generate(value) if [String, Integer, Float, TrueClass, FalseClass, Date].include? value.class value.to_s elsif [Hash, Array].include? value.class JSON::generate(value) elsif value.respond_to?(:to_s) value.to_s end end def self::parse(value) case value.downcase when 'true' true when 'false' false when 'nil', 'null' nil else begin JSON::parse(value) rescue (Integer value rescue nil) || (Float value rescue nil) || value.to_s rescue nil || '' end end end end class Controller def initialize(id,opts) @directory = opts[:instances] + "/#{id}/" @id = id @events = {} @votes = {} @votes_results = {} @communication = {} @callbacks = {} @instance = EmptyWorkflow.new(self,opts[:url]) @positions = [] @thread = nil @mutex = Mutex.new @opts = opts @properties = Riddl::Utils::Properties::Backend.new( { :inactive => opts[:properties_schema_inactive], :active => opts[:properties_schema_active], :finished => opts[:properties_schema_finished] }, @directory + '/properties.xml', opts[:properties_init] ) @notifications = Riddl::Utils::Notifications::Producer::Backend.new( opts[:topics], @directory + '/notifications/' ) @notifications.subscriptions.keys.each do |key| self.unserialize_notifications!(:cre,key) end unless ['stopped','ready','finished'].include?(@properties.data.find("string(/p:properties/p:state)")) @properties.modify do |doc| doc.find("/p:properties/p:state").first.text = 'stopped' end end unserialize_handlerwrapper! unserialize_dataelements! unserialize_endpoints! unserialize_dsl! unserialize_positions! end attr_reader :id attr_reader :properties attr_reader :notifications attr_reader :callbacks attr_reader :mutex def instance_url "#{@opts[:url]}/#{@id}" end def sim # {{{ @thread.join if !@thread.nil? && @thread.alive? @thread = @instance.sim end # }}} def start # {{{ @thread.join if !@thread.nil? && @thread.alive? unless @positions.empty? @instance.search(@positions) end @thread = @instance.start end # }}} def stop # {{{ t = @instance.stop t.run @callbacks.delete_if do |k,c| # only remove vote_callbacks, the other stuff is removed by # the instance stopping cleanup if c.method == :vote_callback c.callback(nil) true else false end end @thread.join if !@thread.nil? && @thread.alive? @callback = [] # everything should be empty now end # }}} def serialize_dataelements! #{{{ @properties.modify do |doc| node = doc.find("/p:properties/p:dataelements").first node.children.delete_all! @instance.data.each do |k,v| node.add(k.to_s,ValueHelper::generate(v)) end end end #}}} def serialize_endpoints! #{{{ @properties.modify do |doc| node = doc.find("/p:properties/p:endpoints").first node.children.delete_all! @instance.endpoints.each do |k,v| node.add(k.to_s,v) end end end #}}} def serialize_state! # {{{ @properties.activate_schema(:finished) if @instance.state == :finished @properties.activate_schema(:inactive) if @instance.state == :stopped || @instance.state == :ready @properties.activate_schema(:active) if @instance.state == :running || @instance.state == :simulating if [:finished, :stopped, :ready].include?(@instance.state) @properties.modify do |doc| node = doc.find("/p:properties/p:state").first node.text = @instance.state end end end # }}} def serialize_positions! # {{{ @properties.modify do |doc| pos = doc.find("/p:properties/p:positions").first pos.children.delete_all! @positions = @instance.positions @instance.positions.each do |p| pos.add("#{p.position}",[p.detail,p.passthrough].compact.join(';')) end end end # }}} def serialize_status! #{{{ @properties.modify do |doc| node = doc.find("/p:properties/p:status/p:id").first node.text = @instance.status.id node = doc.find("/p:properties/p:status/p:message").first node.text = @instance.status.message end end #}}} def unserialize_notifications!(op,key)# {{{ case op when :del @communication.delete(key) @events.each do |eve,keys| keys.delete_if{|k,v| key == k} end @votes.each do |eve,keys| keys.delete_if do |k,v| if key == k @callbacks.each{|voteid,cb|cb.delete_if!(eve,k)} true end end end when :upd if @notifications.subscriptions[key] url = @communication[key] evs = [] vos = [] @events.each { |e,v| evs << e } @votes.each { |e,v| vos << e } @notifications.subscriptions[key].view do |doc| turl = doc.find('string(/n:subscription/@url)') url = turl == '' ? url : turl @communication[key] = url doc.find('/n:subscription/n:topic').each do |t| t.find('n:event').each do |e| @events["#{t.attributes['id']}/#{e}"] ||= {} @events["#{t.attributes['id']}/#{e}"][key] = url evs.delete("#{t.attributes['id']}/#{e}") end t.find('n:vote').each do |e| @votes["#{t.attributes['id']}/#{e}"] ||= {} @votes["#{t.attributes['id']}/#{e}"][key] = url vos.delete("#{t.attributes['id']}/#{e}") end end end evs.each { |e| @votes[e].delete(key) } vos.each do |e| @callbacks.each{|voteid,cb|cb.delete_if!(e,key)} @votes[e].delete(key) end end when :cre @notifications.subscriptions[key].view do |doc| turl = doc.find('string(/n:subscription/@url)') url = turl == '' ? nil : turl @communication[key] = url doc.find('/n:subscription/n:topic').each do |t| t.find('n:event').each do |e| @events["#{t.attributes['id']}/#{e}"] ||= {} @events["#{t.attributes['id']}/#{e}"][key] = (url == "" ? nil : url) end t.find('n:vote').each do |e| @votes["#{t.attributes['id']}/#{e}"] ||= {} @votes["#{t.attributes['id']}/#{e}"][key] = url end end end end end # }}} def unserialize_dataelements! #{{{ @instance.data.clear @properties.data.find("/p:properties/p:dataelements/p:*").each do |e| @instance.data[e.qname.to_sym] = ValueHelper::parse(e.text) end end #}}} def unserialize_endpoints! #{{{ @instance.endpoints.clear @properties.data.find("/p:properties/p:endpoints/p:*").each do |e| @instance.endpoints[e.qname.to_sym] = e.text end end #}}} def unserialize_state! #{{{ state = @properties.data.find("string(/p:properties/p:state)") if call_vote("properties/state/change", :instance => @id, :newstate => state) case state when 'stopping' stop when 'running' start when 'simulating' sim end else if node = @properties.data.find("/p:properties/p:state").first case state when 'stopping'; node.text = 'running' when 'running'; node.text = 'stopped' end end end end #}}} def unserialize_handlerwrapper! #{{{ hw = nil begin hw = eval(@properties.data.find("string(/p:properties/p:handlerwrapper)")) @instance.handlerwrapper = hw rescue => e @instance.handlerwrapper = DefaultHandlerWrapper end if hw != @instance.handlerwrapper @properties.modify do |doc| node = doc.find("/p:properties/p:handlerwrapper").first node.text = @instance.handlerwrapper.to_s end end end #}}} def unserialize_positions! #{{{ @positions = [] @properties.data.find("/p:properties/p:positions/p:*").each do |e| val = e.text.split(';') @positions << ::WEEL::Position.new(e.qname.to_s.to_sym,val[0].to_sym,val[1]) end end #}}} def unserialize_dsl! #{{{ @instance.description = @properties.data.find("string(/p:properties/p:dsl)") end #}}} def unserialize_description! #{{{ dsl = nil nots = [] @properties.modify do |doc| dsl = doc.find("/p:properties/p:dsl").first dslx = doc.find("/p:properties/p:dslx").first desc = doc.find("/p:properties/p:description").first tdesc = doc.find("/p:properties/p:transformation/p:description").first tdata = doc.find("/p:properties/p:transformation/p:dataelements").first tendp = doc.find("/p:properties/p:transformation/p:endpoints").first ### description transformation, including dslx to dsl addit = if tdesc.attributes['type'] == 'copy' || tdesc.empty? desc.children.first.to_doc.root elsif tdesc.attributes['type'] == 'rest' && !tdesc.empty? srv = Riddl::Client.interface(tdesc.text,@opts[:transformation_service],:xmpp => @opts[:xmpp]) status, res = srv.post [ Riddl::Parameter::Complex.new("description","text/xml",desc.children.first.dump), Riddl::Parameter::Simple.new("type","description") ] XML::Smart::string(res[0].value.read).root if status == 200 elsif tdesc.attributes['type'] == 'xslt' && !tdesc.empty? trans = XML::Smart::open_unprotected(tdesc.text) desc.children.first.to_doc.transform_with(trans).root else nil end unless addit.nil? dslx.children.delete_all! dslx.add addit trans = XML::Smart::open_unprotected(@opts[:transformation_dslx]) dsl.text = dslx.to_doc.transform_with(trans) @instance.description = dsl.text end ### dataelements extraction addit = if tdata.attributes['type'] == 'rest' && !tdata.empty? srv = Riddl::Client.interface(tdata.text,@opts[:transformation_service],:xmpp => @opts[:xmpp]) status, res = srv.post [ Riddl::Parameter::Complex.new("description","text/xml",desc.children.first.dump), Riddl::Parameter::Simple.new("type","dataelements") ] res elsif tdata.attributes['type'] == 'xslt' && !tdata.empty? trans = XML::Smart::open_unprotected(tdata.text) desc.children.first.to_doc.transform_with(trans) else nil end unless addit.nil? node = doc.find("/p:properties/p:dataelements").first node.children.delete_all! @instance.data.clear addit.each_slice(2).each do |k,v| @instance.data[k.value] = ValueHelper::parse(v.value) node.add(k.value,ValueHelper::generate(v.value)) end nots << ["properties/dataelements/change", {:instance => instance_url, :changed => JSON::generate(@instance.data)}] end ### enpoints extraction addit = if tendp.attributes['type'] == 'rest' && !tdata.empty? srv = Riddl::Client.interface(tendp.text,@opts[:transformation_service],:xmpp => @opts[:xmpp]) status, res = srv.post [ Riddl::Parameter::Complex.new("description","text/xml",desc.children.first.dump), Riddl::Parameter::Simple.new("type","endpoints") ] res elsif tendp.attributes['type'] == 'xslt' && !tdata.empty? trans = XML::Smart::open_unprotected(tendp.text) desc.children.first.to_doc.transform_with(trans) else nil end unless addit.nil? node = doc.find("/p:properties/p:endpoints").first node.children.delete_all! @instance.endpoints.clear addit.each_slice(2).each do |k,v| @instance.endpoints[k.value.to_sym] = ValueHelper::parse(v.value) node.add(k.value,ValueHelper::generate(v.value)) end nots << ["properties/endpoints/change", {:instance => instance_url, :changed => JSON::generate(@instance.endpoints)}] end end nots end #}}} def notify(what,content={})# {{{ item = @events[what] if item item.each do |ke,ur| Thread.new(ke,ur) do |key,url| ev = build_notification(key,what,content,'event') if url.class == String client = Riddl::Client.new(url,nil,:xmpp => @opts[:xmpp]) client.post ev.map{|k,v|Riddl::Parameter::Simple.new(k,v)} rescue nil elsif url.class == Riddl::Utils::Notifications::Producer::WS e = XML::Smart::string("") ev.each do |k,v| e.root.add(k,v) end url.send(e.to_s) rescue nil end end end end end # }}} def call_vote(what,content={})# {{{ voteid = Digest::MD5.hexdigest(Kernel::rand().to_s) item = @votes[what] if item && item.length > 0 continue = WEEL::Continue.new @votes_results[voteid] = [] inum = 0 item.each do |key,url| if url.class == String inum += 1 elsif url.class == Riddl::Utils::Notifications::Producer::WS inum += 1 unless url.closed? end end item.each do |key,url| Thread.new(key,url,content.dup) do |k,u,c| callback = Digest::MD5.hexdigest(Kernel::rand().to_s) c['callback'] = callback notf = build_notification(k,what,c,'vote',callback) if u.class == String client = Riddl::Client.new(u) params = notf.map{|ke,va|Riddl::Parameter::Simple.new(ke,va)} @mutex.synchronize do status, result, headers = client.post params if headers["CPEE_CALLBACK"] && headers["CPEE_CALLBACK"] == 'true' @callbacks[callback] = Callback.new("vote #{notf.find{|a,b| a == 'notification'}[1]}", self, :vote_callback, what, k, :http, continue, voteid, callback, inum) else vote_callback(result,continue,voteid,callback, inum) end end elsif u.class == Riddl::Utils::Notifications::Producer::WS @callbacks[callback] = Callback.new("vote #{notf.find{|a,b| a == 'notification'}[1]}", self, :vote_callback, what, k, :ws, continue, voteid, callback, inum) e = XML::Smart::string("") notf.each do |ke,va| e.root.add(ke,va) end u.send(e.to_s) end end end continue.wait !@votes_results.delete(voteid).include?(false) else true end end # }}} def vote_callback(result,continue,voteid,callback,num)# {{{ @callbacks.delete(callback) if result == :DELETE @votes_results[voteid] << true else @votes_results[voteid] << (result && result[0] && result[0].value == 'true') end if (num == @votes_results[voteid].length) continue.continue end end # }}} def add_ws(key,socket)# {{{ @communication[key] = socket @events.each do |a| if a[1].has_key?(key) a[1][key] = socket end end @votes.each do |a| if a[1].has_key?(key) a[1][key] = socket end end end # }}} def del_ws(key)# {{{ @communication[key] = nil @events.each do |a| if a[1].has_key?(key) a[1][key] = nil end end @votes.each do |a| if a[1].has_key?(key) a[1][key] = nil end end end # }}} private def build_notification(key,what,content,type,callback=nil)# {{{ res = [] res << ['key' , key] res << ['topic' , ::File::dirname(what)] res << [type , ::File::basename(what)] res << ['notification' , ValueHelper::generate(content)] res << ['callback' , callback] unless callback.nil? res << ['fingerprint-with-consumer-secret', Digest::MD5.hexdigest(res.join(''))] # TODO add secret to fp end # }}} end end