# This file is part of CPEE.
#
# CPEE is free software: you can redistribute it and/or modify it under the terms
# of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# CPEE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# CPEE (file COPYING in the main directory). If not, see
# .
require 'fileutils'
require 'redis'
require 'riddl/server'
require 'riddl/client'
require_relative 'redis'
require_relative 'message'
require_relative 'persistence'
require_relative 'statemachine'
require_relative 'implementation_properties'
require_relative 'implementation_notifications'
require_relative 'implementation_callbacks'
module CPEE
SERVER = File.expand_path(File.join(__dir__,'..','cpee.xml'))
PROPERTIES_PATHS_FULL = %w{
/p:properties/p:handlerwrapper
/p:properties/p:positions/p:*
/p:properties/p:positions/p:*/@*
/p:properties/p:dataelements/p:*
/p:properties/p:endpoints/p:*
/p:properties/p:attributes/p:*
/p:properties/p:transformation/p:*
/p:properties/p:transformation/p:*/@*
/p:properties/p:description
/p:properties/p:dslx
/p:properties/p:dsl
/p:properties/p:status/p:id
/p:properties/p:status/p:message
/p:properties/p:state/@changed
/p:properties/p:state
}
PROPERTIES_PATHS_INDEX_UNORDERED = %w{
/p:properties/p:positions/p:*
}
PROPERTIES_PATHS_INDEX_ORDERED = %w{
/p:properties/p:dataelements/p:*
/p:properties/p:endpoints/p:*
/p:properties/p:attributes/p:*
}
def self::implementation(opts)
opts[:instances] ||= File.expand_path(File.join(__dir__,'..','..','server','instances'))
opts[:global_handlerwrappers] ||= File.expand_path(File.join(__dir__,'..','..','server','handlerwrappers'))
opts[:handlerwrappers] ||= ''
opts[:topics] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','topics.xml'))
opts[:properties_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','properties.init'))
opts[:properties_empty] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','properties.empty'))
opts[:transformation_dslx] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','transformation_dslx.xsl'))
opts[:transformation_service] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','transformation.xml'))
opts[:empty_dslx] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','empty_dslx.xml'))
opts[:notifications_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','notifications'))
opts[:states] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','states.xml'))
opts[:backend_run] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','backend','run'))
opts[:backend_template] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','backend','instance.template'))
opts[:backend_opts] ||= 'opts.yaml'
opts[:watchdog_frequency] ||= 7
opts[:watchdog_start_off] ||= false
opts[:backend_instance] ||= 'instance.rb'
opts[:infinite_loop_stop] ||= 10000
### set redis_cmd to nil if you want to do global
### at least redis_path or redis_url and redis_db have to be set if you do global
opts[:redis_path] ||= 'redis.sock' # use e.g. /tmp/redis.sock for global stuff. Look it up in your redis config
opts[:redis_db] ||= 0
### optional redis stuff
opts[:redis_url] ||= nil
opts[:redis_cmd] ||= 'redis-server --port 0 --unixsocket #redis_path# --unixsocketperm 600 --pidfile #redis_pid# --dir #redis_db_dir# --dbfilename #redis_db_name# --databases 1 --save 900 1 --save 300 10 --save 60 10000 --rdbcompression yes --daemonize yes'
opts[:redis_pid] ||= 'redis.pid' # use e.g. /var/run/redis.pid if you do global. Look it up in your redis config
opts[:redis_db_name] ||= 'redis.rdb' # use e.g. /var/lib/redis.rdb for global stuff. Look it up in your redis config
CPEE::redis_connect opts
opts[:sse_keepalive_frequency] ||= 10
opts[:sse_connections] = {}
opts[:statemachine] = CPEE::StateMachine.new opts[:states], %w{running simulating replaying finishing stopping abandoned finished} do |id|
opts[:redis].get("instance:#{id}/state")
end
opts[:runtime_cmds] << [
"startclean", "Delete instances before starting.", Proc.new { |status|
Dir.glob(File.expand_path(File.join(opts[:instances],'*'))).each do |d|
FileUtils.rm_r(d) if File.basename(d) =~ /^\d+$/
end
}
]
Proc.new do
parallel do
CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db])
EM.add_periodic_timer(opts[:watchdog_frequency]) do ### start services
CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db])
end
EM.defer do ### catch all sse connections
CPEE::Notifications::sse_distributor(opts)
end
EM.add_periodic_timer(opts[:sse_keepalive_frequency]) do
CPEE::Notifications::sse_heartbeat(opts)
end
end
cleanup do
CPEE::cleanup_services(opts[:watchdog_start_off])
end
interface 'main' do
run CPEE::Instances, opts if get '*'
run CPEE::NewInstance, opts if post 'instance-new'
on resource '\d+' do |r|
run CPEE::Info, opts if get
run CPEE::DeleteInstance, opts if delete
end
end
interface 'properties' do |r|
id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i
use CPEE::Properties::implementation(id.to_i, opts)
end
interface 'notifications' do |r|
id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i
use CPEE::Notifications::implementation(id.to_i, opts)
end
interface 'callbacks' do |r|
id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i
use CPEE::Callbacks::implementation(id.to_i, opts)
end
end
end
def self::watch_services(watchdog_start_off,url,path,db)
return if watchdog_start_off
EM.defer do
Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s|
s = s.sub(/\.rb$/,'')
pid = (File.read(s + '.pid').to_i rescue nil)
if (pid.nil? || !(Process.kill(0, pid) rescue false)) && !File.exist?(s + '.lock')
if url.nil?
system "#{s}.rb -p \"#{path}\" -d #{db} restart 1>/dev/null 2>&1"
else
system "#{s}.rb -u \"#{url}\" -d #{db} restart 1>/dev/null 2>&1"
end
puts "➡ Service #{File.basename(s,'.rb')} started ..."
end
end
end
end
def self::cleanup_services(watchdog_start_off)
return if watchdog_start_off
Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s|
s = s.sub(/\.rb$/,'')
pid = (File.read(s + '.pid').to_i rescue nil)
if !pid.nil? || (Process.kill(0, pid) rescue false)
system "#{s}.rb stop 1>/dev/null 2>&1"
puts "➡ Service #{File.basename(s,'.rb')} stopped ..."
end
end
end
class Instances < Riddl::Implementation #{{{
def response
redis = @a[0][:redis]
Riddl::Parameter::Complex.new("wis","text/xml") do
ins = XML::Smart::string('')
redis.zrevrange('instances',0,-1).each do |instance|
statekey = "instance:#{instance}/state"
attributes = "instance:#{instance}/attributes/"
info = redis.get(attributes + 'info')
uuid = redis.get(attributes + 'uuid')
state = redis.get(statekey)
state_changed = redis.get(File.join(statekey,'@changed'))
ins.root.add('instance', info, 'uuid' => uuid, 'id' => instance, 'state' => state, 'state_changed' => state_changed )
end
ins.to_s
end
end
end #}}}
class NewInstance < Riddl::Implementation #{{{
def path(e)
ret = []
until e.qname.name == 'properties'
ret << (e.class == XML::Smart::Dom::Attribute ? '@' : '') + e.qname.name
e = e.parent
end
File.join(*ret.reverse)
end
def response
opts = @a[0]
redis = opts[:redis]
doc = XML::Smart::open_unprotected(opts[:properties_init])
doc.register_namespace 'p', 'http://cpee.org/ns/properties/2.0'
name = @p[0].value
id = redis.zrevrange('instances', 0, 0).first.to_i + 1
uuid = SecureRandom.uuid
instance = 'instance:' + id.to_s
redis.multi do |multi|
multi.zadd('instances',id,id)
doc.root.find(PROPERTIES_PATHS_FULL.join(' | ')).each do |e|
if e.class == XML::Smart::Dom::Element && e.element_only?
val = e.find('*').map { |f| f.dump }.join
multi.set(File.join(instance, path(e)), val)
else
multi.set(File.join(instance, path(e)), e.text)
end
end
doc.root.find(PROPERTIES_PATHS_INDEX_UNORDERED.join(' | ')).each do |e|
p = path(e)
multi.sadd(File.join(instance, File.dirname(p)), File.basename(p))
end
doc.root.find(PROPERTIES_PATHS_INDEX_ORDERED.join(' | ')).each_with_index do |e,i|
p = path(e)
multi.zadd(File.join(instance, File.dirname(p)), i, File.basename(p))
end
Dir[File.join(opts[:notifications_init],'*','subscription.xml')].each do |f|
XML::Smart::open_unprotected(f) do |doc|
doc.register_namespace 'np', 'http://riddl.org/ns/common-patterns/notifications-producer/2.0'
key = File.basename(File.dirname(f))
url = doc.find('string(/np:subscription/@url)')
multi.sadd("instance:#{id}/handlers",key)
multi.set("instance:#{id}/handlers/#{key}/url",url)
doc.find('/np:subscription/np:topic/*').each do |e|
c = File.join(e.parent.attributes['id'],e.qname.name,e.text)
multi.sadd("instance:#{id}/handlers/#{key}",c)
multi.sadd("instance:#{id}/handlers/#{c}",key)
end
end rescue nil # all the ones that are not ok, are ignored
end
multi.set(File.join(instance, 'attributes', 'uuid'), uuid)
multi.zadd(File.join(instance, 'attributes'), -2, 'uuid')
multi.set(File.join(instance, 'attributes', 'info'), name)
multi.zadd(File.join(instance, 'attributes'), -1, 'info')
multi.set(File.join(instance, 'state', '@changed'), Time.now.xmlschema(3))
end
@headers << Riddl::Header.new("CPEE-INSTANCE", id.to_s)
@headers << Riddl::Header.new("CPEE-INSTANCE-URL", File.join(opts[:url].to_s,id.to_s))
@headers << Riddl::Header.new("CPEE-INSTANCE-UUID", uuid)
Riddl::Parameter::Simple.new("id", id.to_s)
end
end #}}}
class Info < Riddl::Implementation #{{{
def response
opts = @a[0]
id = @r[0].to_i
unless opts[:redis].exists?("instance:#{id}/state")
@status = 404
return
end
Riddl::Parameter::Complex.new("info","text/xml") do
i = XML::Smart::string <<-END
END
i.to_s
end
end
end #}}}
class DeleteInstance < Riddl::Implementation #{{{
def response
opts = @a[0]
redis = opts[:redis]
id = @r[0].to_i
unless redis.exists?("instance:#{id}/state")
@status = 404
return
end
empt = redis.keys("instance:#{id}/*").to_a
redis.multi do |multi|
multi.del empt
multi.zrem 'instances', id
end
end
end #}}}
end