lib/cpee/implementation.rb in cpee-2.1.39 vs lib/cpee/implementation.rb in cpee-2.1.41

- old
+ new

@@ -52,11 +52,11 @@ /p:properties/p:dataelements/p:* /p:properties/p:endpoints/p:* /p:properties/p:attributes/p:* } def self::implementation(opts) - opts[:see_instances] ||= opts[:see_instances].nil? ? true : opts[:see_instances] + opts[:see_instances] ||= opts[:see_instances].nil? ? false : opts[:see_instances] opts[:instances] ||= File.expand_path(File.join(__dir__,'..','..','server','instances')) opts[:global_executionhandlers] ||= File.expand_path(File.join(__dir__,'..','..','server','executionhandlers')) opts[:executionhandlers] ||= '' opts[:topics] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','topics.xml')) @@ -67,10 +67,13 @@ opts[:notifications_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','notifications')) opts[:states] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','states.xml')) opts[:watchdog_frequency] ||= 7 opts[:watchdog_start_off] ||= false opts[:infinite_loop_stop] ||= 10000 + opts[:workers] ||= 1 + opts[:workers_single] ||= ['end','forward-votes'] + opts[:workers_multi] ||= ['persist','forward-events'] opts[:dashing_frequency] ||= 3 opts[:dashing_target] ||= nil ### set redis_cmd to nil if you want to do global @@ -105,15 +108,16 @@ require h end unless opts[:global_executionhandlers].nil? || opts[:global_executionhandlers].strip == '' Dir[File.join(opts[:executionhandlers],'*','execution.rb')].each do |h| require h end unless opts[:executionhandlers].nil? || opts[:executionhandlers].strip == '' + CPEE::Message::set_workers(opts[:workers]) parallel do - CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db]) + CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db],opts[:workers],opts[:workers_single],opts[:workers_multi]) EM.add_periodic_timer(opts[:watchdog_frequency]) do ### start services - CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db]) + CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db],opts[:workers],opts[:workers_single],opts[:workers_multi]) end EM.defer do ### catch all sse connections CPEE::Notifications::sse_distributor(opts) end EM.add_periodic_timer(opts[:sse_keepalive_frequency]) do @@ -198,43 +202,64 @@ use CPEE::Callbacks::implementation(id.to_i, opts) end end end - def self::watch_services(watchdog_start_off,url,path,db) + def self::watch_services(watchdog_start_off,url,path,db,workers,workers_single,workers_multi) return if watchdog_start_off - EM.defer do - Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s| - s = s.sub(/\.rb$/,'') - pid = (File.read(s + '.pid').to_i rescue nil) - if (pid.nil? || !(Process.kill(0, pid) rescue false)) && !File.exist?(s + '.lock') - if url.nil? - system "#{s}.rb -p \"#{path}\" -d #{db} restart 1>/dev/null 2>&1" - else - system "#{s}.rb -u \"#{url}\" -d #{db} restart 1>/dev/null 2>&1" - end - puts "➡ Service #{File.basename(s,'.rb')} started ..." - end - end + EM.defer do + workers_single.each do |s| + s = File.join(__dir__,'..','..','server','routing',s) + next if File.exist?(s + '.lock') + pid = (File.read(s + '.pid').to_i rescue nil) + if (pid.nil? || !(Process.kill(0, pid) rescue false)) + if url.nil? + system "#{s}.rb -p \"#{path}\" -d #{db} restart 1>/dev/null 2>&1" + else + system "#{s}.rb -u \"#{url}\" -d #{db} restart 1>/dev/null 2>&1" + end + puts "➡ Service #{File.basename(s)} started ..." + end + end + workers_multi.each do |s| + s = File.join(__dir__,'..','..','server','routing',s.to_s) + next if File.exist?(s + '.lock') + (0...workers).each do |w| + w = '%02i' % w + pid = (File.read(s + '-' + w + '.pid').to_i rescue nil) + if (pid.nil? || !(Process.kill(0, pid) rescue false)) + cmd = if url.nil? + "-p \"#{path}\" -d #{db} -w #{w} restart" + else + "-u \"#{url}\" -d #{db} -w #{w} restart" + end + system "#{s}.rb " + cmd + " 1>/dev/null 2>&1" + puts "➡ Service #{File.basename(s)}-#{w} (#{cmd}) started ..." + end + end + end end end def self::cleanup_services(watchdog_start_off) return if watchdog_start_off - Dir[File.join(__dir__,'..','..','server','routing','*.rb')].each do |s| - s = s.sub(/\.rb$/,'') - pid = (File.read(s + '.pid').to_i rescue nil) + Dir[File.join(__dir__,'..','..','server','routing','*.pid')].each do |s| + pid = (File.read(s).to_i rescue nil) if !pid.nil? || (Process.kill(0, pid) rescue false) - system "#{s}.rb stop 1>/dev/null 2>&1" - puts "➡ Service #{File.basename(s,'.rb')} stopped ..." + f = s.sub(/(-(\d+))?\.pid$/,'.rb') + if $2.nil? + system "#{f} stop 1>/dev/null 2>&1" + else + system "#{f} -w #{$2} stop 1>/dev/null 2>&1" + end + puts "➡ Service #{File.basename(s,'.pid')} stopped ..." end end end class Instances < Riddl::Implementation #{{{ def response opts = @a[0] - pp @request[:env]['REMOTE_ADDR'] if opts[:see_instances] || @h['SEE_INSTANCES'] == 'true' Riddl::Parameter::Complex.new("wis","text/xml") do ins = XML::Smart::string('<instances/>') CPEE::Persistence::each_object(opts) do |instance| info = CPEE::Persistence::extract_item(instance,opts,'attributes/info') @@ -311,11 +336,11 @@ content = { :state => 'ready', :attributes => CPEE::Persistence::extract_list(id,opts,'attributes').to_h } - CPEE::Message::send(:event,'state/change',File.join(opts[:url],'/'),id,uuid,name,content,redis) + CPEE::Message::send(:event,'state/change',File.join(opts[:url],'/'),id,uuid,name,content,redis,opts[:workers]) @headers << Riddl::Header.new("CPEE-INSTANCE", id.to_s) @headers << Riddl::Header.new("CPEE-INSTANCE-URL", File.join(opts[:url].to_s,id.to_s,'/')) @headers << Riddl::Header.new("CPEE-INSTANCE-UUID", uuid) @@ -361,18 +386,15 @@ state = CPEE::Persistence::extract_item(id,opts,'state') if state == 'stopped' || state == 'ready' CPEE::Message::send(:event,'state/change',File.join(opts[:url],'/'),id,content[:attributes]['uuid'],content[:attributes]['info'],content,redis) end - EM::add_timer(30) do - empt = CPEE::Persistence::keys(id,opts).to_a - ### is there to avoid that returning calls get intro problems - ### as we have add_timer now, it should work without this - # empt.delete_if{|e| e =~ /\/handlers/ } - redis.multi do |multi| - multi.del empt - multi.zrem 'instances', id + empt = CPEE::Persistence::keys(id,opts).to_a + redis.multi do |multi| + empt.each do |e| + multi.expire e, 30 end + multi.zrem 'instances', id end end end #}}} end