# This file is part of CPEE. # # CPEE is free software: you can redistribute it and/or modify it under the terms # of the GNU General Public License as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # # CPEE is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A # PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # CPEE (file COPYING in the main directory). If not, see # . require 'fileutils' require 'redis' require 'riddl/server' require 'riddl/client' require_relative 'redis' require_relative 'message' require_relative 'persistence' require_relative 'statemachine' require_relative 'implementation_properties' require_relative 'implementation_notifications' require_relative 'implementation_callbacks' module CPEE SERVER = File.expand_path(File.join(__dir__,'..','cpee.xml')) PROPERTIES_PATHS_FULL = %w{ /p:properties/p:handlerwrapper /p:properties/p:positions/p:* /p:properties/p:positions/p:*/@* /p:properties/p:dataelements/p:* /p:properties/p:endpoints/p:* /p:properties/p:attributes/p:* /p:properties/p:transformation/p:* /p:properties/p:transformation/p:*/@* /p:properties/p:description /p:properties/p:dslx /p:properties/p:dsl /p:properties/p:status/p:id /p:properties/p:status/p:message /p:properties/p:state/@changed /p:properties/p:state } PROPERTIES_PATHS_INDEX_UNORDERED = %w{ /p:properties/p:positions/p:* } PROPERTIES_PATHS_INDEX_ORDERED = %w{ /p:properties/p:dataelements/p:* /p:properties/p:endpoints/p:* /p:properties/p:attributes/p:* } def self::implementation(opts) opts[:instances] ||= File.expand_path(File.join(__dir__,'..','..','server','instances')) opts[:global_handlerwrappers] ||= File.expand_path(File.join(__dir__,'..','..','server','handlerwrappers')) opts[:handlerwrappers] ||= '' opts[:topics] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','topics.xml')) opts[:properties_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','properties.init')) opts[:properties_empty] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','properties.empty')) opts[:transformation_dslx] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','transformation_dslx.xsl')) opts[:transformation_service] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','transformation.xml')) opts[:empty_dslx] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','empty_dslx.xml')) opts[:notifications_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','notifications')) opts[:states] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','states.xml')) opts[:backend_run] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','backend','run')) opts[:backend_template] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','backend','instance.template')) opts[:backend_opts] ||= 'opts.yaml' opts[:watchdog_frequency] ||= 7 opts[:watchdog_start_off] ||= false opts[:backend_instance] ||= 'instance.rb' opts[:infinite_loop_stop] ||= 10000 ### set redis_cmd to nil if you want to do global ### at least redis_path or redis_url and redis_db have to be set if you do global opts[:redis_path] ||= 'redis.sock' # use e.g. /tmp/redis.sock for global stuff. Look it up in your redis config opts[:redis_db] ||= 0 ### optional redis stuff opts[:redis_url] ||= nil opts[:redis_cmd] ||= 'redis-server --port 0 --unixsocket #redis_path# --unixsocketperm 600 --pidfile #redis_pid# --dir #redis_db_dir# --dbfilename #redis_db_name# --databases 1 --save 900 1 --save 300 10 --save 60 10000 --rdbcompression yes --daemonize yes' opts[:redis_pid] ||= 'redis.pid' # use e.g. /var/run/redis.pid if you do global. Look it up in your redis config opts[:redis_db_name] ||= 'redis.rdb' # use e.g. /var/lib/redis.rdb for global stuff. Look it up in your redis config CPEE::redis_connect opts opts[:sse_keepalive_frequency] ||= 10 opts[:sse_connections] = {} opts[:statemachine] = CPEE::StateMachine.new opts[:states], %w{running simulating replaying finishing stopping abandoned finished} do |id| opts[:redis].get("instance:#{id}/state") end opts[:runtime_cmds] << [ "startclean", "Delete instances before starting.", Proc.new { |status| Dir.glob(File.expand_path(File.join(opts[:instances],'*'))).each do |d| FileUtils.rm_r(d) if File.basename(d) =~ /^\d+$/ end } ] Proc.new do parallel do CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db]) EM.add_periodic_timer(opts[:watchdog_frequency]) do ### start services CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db]) end EM.defer do ### catch all sse connections CPEE::Notifications::sse_distributor(opts) end EM.add_periodic_timer(opts[:sse_keepalive_frequency]) do CPEE::Notifications::sse_heartbeat(opts) end end cleanup do CPEE::cleanup_services(opts[:watchdog_start_off]) end interface 'main' do run CPEE::Instances, opts if get '*' run CPEE::NewInstance, opts if post 'instance-new' on resource '\d+' do |r| run CPEE::Info, opts if get run CPEE::DeleteInstance, opts if delete end end interface 'properties' do |r| id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i use CPEE::Properties::implementation(id.to_i, opts) end interface 'notifications' do |r| id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i use CPEE::Notifications::implementation(id.to_i, opts) end interface 'callbacks' do |r| id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i use CPEE::Callbacks::implementation(id.to_i, opts) end end end def self::watch_services(watchdog_start_off,url,path,db) return if watchdog_start_off EM.defer do Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s| s = s.sub(/\.rb$/,'') pid = (File.read(s + '.pid').to_i rescue nil) if (pid.nil? || !(Process.kill(0, pid) rescue false)) && !File.exist?(s + '.lock') if url.nil? system "#{s}.rb -p \"#{path}\" -d #{db} restart 1>/dev/null 2>&1" else system "#{s}.rb -u \"#{url}\" -d #{db} restart 1>/dev/null 2>&1" end puts "➡ Service #{File.basename(s,'.rb')} started ..." end end end end def self::cleanup_services(watchdog_start_off) return if watchdog_start_off Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s| s = s.sub(/\.rb$/,'') pid = (File.read(s + '.pid').to_i rescue nil) if !pid.nil? || (Process.kill(0, pid) rescue false) system "#{s}.rb stop 1>/dev/null 2>&1" puts "➡ Service #{File.basename(s,'.rb')} stopped ..." end end end class Instances < Riddl::Implementation #{{{ def response redis = @a[0][:redis] Riddl::Parameter::Complex.new("wis","text/xml") do ins = XML::Smart::string('') redis.zrevrange('instances',0,-1).each do |instance| statekey = "instance:#{instance}/state" attributes = "instance:#{instance}/attributes/" info = redis.get(attributes + 'info') uuid = redis.get(attributes + 'uuid') state = redis.get(statekey) state_changed = redis.get(File.join(statekey,'@changed')) ins.root.add('instance', info, 'uuid' => uuid, 'id' => instance, 'state' => state, 'state_changed' => state_changed ) end ins.to_s end end end #}}} class NewInstance < Riddl::Implementation #{{{ def path(e) ret = [] until e.qname.name == 'properties' ret << (e.class == XML::Smart::Dom::Attribute ? '@' : '') + e.qname.name e = e.parent end File.join(*ret.reverse) end def response opts = @a[0] redis = opts[:redis] doc = XML::Smart::open_unprotected(opts[:properties_init]) doc.register_namespace 'p', 'http://cpee.org/ns/properties/2.0' name = @p[0].value id = redis.zrevrange('instances', 0, 0).first.to_i + 1 uuid = SecureRandom.uuid instance = 'instance:' + id.to_s redis.multi do |multi| multi.zadd('instances',id,id) doc.root.find(PROPERTIES_PATHS_FULL.join(' | ')).each do |e| if e.class == XML::Smart::Dom::Element && e.element_only? val = e.find('*').map { |f| f.dump }.join multi.set(File.join(instance, path(e)), val) else multi.set(File.join(instance, path(e)), e.text) end end doc.root.find(PROPERTIES_PATHS_INDEX_UNORDERED.join(' | ')).each do |e| p = path(e) multi.sadd(File.join(instance, File.dirname(p)), File.basename(p)) end doc.root.find(PROPERTIES_PATHS_INDEX_ORDERED.join(' | ')).each_with_index do |e,i| p = path(e) multi.zadd(File.join(instance, File.dirname(p)), i, File.basename(p)) end Dir[File.join(opts[:notifications_init],'*','subscription.xml')].each do |f| XML::Smart::open_unprotected(f) do |doc| doc.register_namespace 'np', 'http://riddl.org/ns/common-patterns/notifications-producer/2.0' key = File.basename(File.dirname(f)) url = doc.find('string(/np:subscription/@url)') multi.sadd("instance:#{id}/handlers",key) multi.set("instance:#{id}/handlers/#{key}/url",url) doc.find('/np:subscription/np:topic/*').each do |e| c = File.join(e.parent.attributes['id'],e.qname.name,e.text) multi.sadd("instance:#{id}/handlers/#{key}",c) multi.sadd("instance:#{id}/handlers/#{c}",key) end end rescue nil # all the ones that are not ok, are ignored end multi.set(File.join(instance, 'attributes', 'uuid'), uuid) multi.zadd(File.join(instance, 'attributes'), -2, 'uuid') multi.set(File.join(instance, 'attributes', 'info'), name) multi.zadd(File.join(instance, 'attributes'), -1, 'info') multi.set(File.join(instance, 'state', '@changed'), Time.now.xmlschema(3)) end @headers << Riddl::Header.new("CPEE-INSTANCE", id.to_s) @headers << Riddl::Header.new("CPEE-INSTANCE-URL", File.join(opts[:url].to_s,id.to_s)) @headers << Riddl::Header.new("CPEE-INSTANCE-UUID", uuid) Riddl::Parameter::Simple.new("id", id.to_s) end end #}}} class Info < Riddl::Implementation #{{{ def response opts = @a[0] id = @r[0].to_i unless opts[:redis].exists?("instance:#{id}/state") @status = 404 return end Riddl::Parameter::Complex.new("info","text/xml") do i = XML::Smart::string <<-END END i.to_s end end end #}}} class DeleteInstance < Riddl::Implementation #{{{ def response opts = @a[0] redis = opts[:redis] id = @r[0].to_i unless redis.exists?("instance:#{id}/state") @status = 404 return end empt = redis.keys("instance:#{id}/*").to_a redis.multi do |multi| multi.del empt multi.zrem 'instances', id end end end #}}} end