lib/cpee/implementation.rb in cpee-2.1.39 vs lib/cpee/implementation.rb in cpee-2.1.41
- old
+ new
@@ -52,11 +52,11 @@
/p:properties/p:dataelements/p:*
/p:properties/p:endpoints/p:*
/p:properties/p:attributes/p:*
}
def self::implementation(opts)
- opts[:see_instances] ||= opts[:see_instances].nil? ? true : opts[:see_instances]
+ opts[:see_instances] ||= opts[:see_instances].nil? ? false : opts[:see_instances]
opts[:instances] ||= File.expand_path(File.join(__dir__,'..','..','server','instances'))
opts[:global_executionhandlers] ||= File.expand_path(File.join(__dir__,'..','..','server','executionhandlers'))
opts[:executionhandlers] ||= ''
opts[:topics] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','topics.xml'))
@@ -67,10 +67,13 @@
opts[:notifications_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','notifications'))
opts[:states] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','states.xml'))
opts[:watchdog_frequency] ||= 7
opts[:watchdog_start_off] ||= false
opts[:infinite_loop_stop] ||= 10000
+ opts[:workers] ||= 1
+ opts[:workers_single] ||= ['end','forward-votes']
+ opts[:workers_multi] ||= ['persist','forward-events']
opts[:dashing_frequency] ||= 3
opts[:dashing_target] ||= nil
### set redis_cmd to nil if you want to do global
@@ -105,15 +108,16 @@
require h
end unless opts[:global_executionhandlers].nil? || opts[:global_executionhandlers].strip == ''
Dir[File.join(opts[:executionhandlers],'*','execution.rb')].each do |h|
require h
end unless opts[:executionhandlers].nil? || opts[:executionhandlers].strip == ''
+ CPEE::Message::set_workers(opts[:workers])
parallel do
- CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db])
+ CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db],opts[:workers],opts[:workers_single],opts[:workers_multi])
EM.add_periodic_timer(opts[:watchdog_frequency]) do ### start services
- CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db])
+ CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db],opts[:workers],opts[:workers_single],opts[:workers_multi])
end
EM.defer do ### catch all sse connections
CPEE::Notifications::sse_distributor(opts)
end
EM.add_periodic_timer(opts[:sse_keepalive_frequency]) do
@@ -198,43 +202,64 @@
use CPEE::Callbacks::implementation(id.to_i, opts)
end
end
end
- def self::watch_services(watchdog_start_off,url,path,db)
+ def self::watch_services(watchdog_start_off,url,path,db,workers,workers_single,workers_multi)
return if watchdog_start_off
- EM.defer do
- Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s|
- s = s.sub(/\.rb$/,'')
- pid = (File.read(s + '.pid').to_i rescue nil)
- if (pid.nil? || !(Process.kill(0, pid) rescue false)) && !File.exist?(s + '.lock')
- if url.nil?
- system "#{s}.rb -p \"#{path}\" -d #{db} restart 1>/dev/null 2>&1"
- else
- system "#{s}.rb -u \"#{url}\" -d #{db} restart 1>/dev/null 2>&1"
- end
- puts "➡ Service #{File.basename(s,'.rb')} started ..."
- end
- end
+ EM.defer do
+ workers_single.each do |s|
+ s = File.join(__dir__,'..','..','server','routing',s)
+ next if File.exist?(s + '.lock')
+ pid = (File.read(s + '.pid').to_i rescue nil)
+ if (pid.nil? || !(Process.kill(0, pid) rescue false))
+ if url.nil?
+ system "#{s}.rb -p \"#{path}\" -d #{db} restart 1>/dev/null 2>&1"
+ else
+ system "#{s}.rb -u \"#{url}\" -d #{db} restart 1>/dev/null 2>&1"
+ end
+ puts "➡ Service #{File.basename(s)} started ..."
+ end
+ end
+ workers_multi.each do |s|
+ s = File.join(__dir__,'..','..','server','routing',s.to_s)
+ next if File.exist?(s + '.lock')
+ (0...workers).each do |w|
+ w = '%02i' % w
+ pid = (File.read(s + '-' + w + '.pid').to_i rescue nil)
+ if (pid.nil? || !(Process.kill(0, pid) rescue false))
+ cmd = if url.nil?
+ "-p \"#{path}\" -d #{db} -w #{w} restart"
+ else
+ "-u \"#{url}\" -d #{db} -w #{w} restart"
+ end
+ system "#{s}.rb " + cmd + " 1>/dev/null 2>&1"
+ puts "➡ Service #{File.basename(s)}-#{w} (#{cmd}) started ..."
+ end
+ end
+ end
end
end
def self::cleanup_services(watchdog_start_off)
return if watchdog_start_off
- Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s|
- s = s.sub(/\.rb$/,'')
- pid = (File.read(s + '.pid').to_i rescue nil)
+ Dir[File.join(__dir__,'..','..','server','routing','*.pid')].each do |s|
+ pid = (File.read(s).to_i rescue nil)
if !pid.nil? || (Process.kill(0, pid) rescue false)
- system "#{s}.rb stop 1>/dev/null 2>&1"
- puts "➡ Service #{File.basename(s,'.rb')} stopped ..."
+ f = s.sub(/(-(\d+))?\.pid$/,'.rb')
+ if $2.nil?
+ system "#{f} stop 1>/dev/null 2>&1"
+ else
+ system "#{f} -w #{$2} stop 1>/dev/null 2>&1"
+ end
+ puts "➡ Service #{File.basename(s,'.pid')} stopped ..."
end
end
end
class Instances < Riddl::Implementation #{{{
def response
opts = @a[0]
- pp @request[:env]['REMOTE_ADDR']
if opts[:see_instances] || @h['SEE_INSTANCES'] == 'true'
Riddl::Parameter::Complex.new("wis","text/xml") do
ins = XML::Smart::string('<instances/>')
CPEE::Persistence::each_object(opts) do |instance|
info = CPEE::Persistence::extract_item(instance,opts,'attributes/info')
@@ -311,11 +336,11 @@
content = {
:state => 'ready',
:attributes => CPEE::Persistence::extract_list(id,opts,'attributes').to_h
}
- CPEE::Message::send(:event,'state/change',File.join(opts[:url],'/'),id,uuid,name,content,redis)
+ CPEE::Message::send(:event,'state/change',File.join(opts[:url],'/'),id,uuid,name,content,redis,opts[:workers])
@headers << Riddl::Header.new("CPEE-INSTANCE", id.to_s)
@headers << Riddl::Header.new("CPEE-INSTANCE-URL", File.join(opts[:url].to_s,id.to_s,'/'))
@headers << Riddl::Header.new("CPEE-INSTANCE-UUID", uuid)
@@ -361,18 +386,15 @@
state = CPEE::Persistence::extract_item(id,opts,'state')
if state == 'stopped' || state == 'ready'
CPEE::Message::send(:event,'state/change',File.join(opts[:url],'/'),id,content[:attributes]['uuid'],content[:attributes]['info'],content,redis)
end
- EM::add_timer(30) do
- empt = CPEE::Persistence::keys(id,opts).to_a
- ### is there to avoid that returning calls get intro problems
- ### as we have add_timer now, it should work without this
- # empt.delete_if{|e| e =~ /\/handlers/ }
- redis.multi do |multi|
- multi.del empt
- multi.zrem 'instances', id
+ empt = CPEE::Persistence::keys(id,opts).to_a
+ redis.multi do |multi|
+ empt.each do |e|
+ multi.expire e, 30
end
+ multi.zrem 'instances', id
end
end
end #}}}
end