# This file is part of CPEE. # # CPEE is free software: you can redistribute it and/or modify it under the terms # of the GNU General Public License as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # # CPEE is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A # PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # CPEE (file COPYING in the main directory). If not, see # . require 'charlock_holmes' require 'mimemagic' require 'base64' require 'get_process_mem' require 'cpee-eval-ruby/translation' class ConnectionWrapper < WEEL::ConnectionWrapperBase def self::loop_guard(arguments,id,count) # {{{ controller = arguments[0] return false if controller.attributes['nednoamol'] tsn = Time.now tso = controller.loop_guard[id][:timestamp] rescue Time.now controller.loop_guard[id] = { :count => count, :timestamp => tsn } # if we have more than 100 loop iterations and the last one took less than 2 seconds, we slow the hell down tso + 2 > tsn && count > 100 end # }}} def self::inform_state_change(arguments,newstate) # {{{ controller = arguments[0] controller.notify("state/change", :state => newstate) end # }}} def self::inform_syntax_error(arguments,err,code)# {{{ # TODO extract spot (code) where error happened for better error handling (ruby 3.1 only) # https://github.com/rails/rails/pull/45818/commits/3beb2aff3be712e44c34a588fbf35b79c0246ca5 puts err.message puts err.backtrace controller = arguments[0] mess = err.backtrace ? err.backtrace[0].gsub(/([\w -_]+):(\d+):in.*/,'\\1, Line \2: ') : '' mess += err.message controller.notify("description/error", :message => mess) end# }}} def self::inform_connectionwrapper_error(arguments,err) # {{{ controller = arguments[0] puts err.message puts err.backtrace controller.notify("executionhandler/error", :message => err.backtrace[0].gsub(/([\w -_]+):(\d+):in.*/,'\\1, Line \2: ') + err.message) end # }}} def self::inform_position_change(arguments,ipc={}) # {{{ controller = arguments[0] controller.notify("position/change", ipc) end # }}} def initialize(arguments,position=nil,continue=nil) # {{{ @controller = arguments[0] @handler_continue = continue @handler_position = position @handler_passthrough = nil @handler_returnValue = nil @handler_returnOptions = nil @handler_activity_uuid = Digest::MD5.hexdigest(Kernel::rand().to_s) @label = '' @guard_files = [] @guard_items = [] end # }}} def additional #{{{ { :attributes => @controller.attributes, :cpee => { 'base' => @controller.base_url, 'instance' => @controller.instance_id, 'instance_url' => @controller.instance_url, 'instance_uuid' => @controller.uuid }, :task => { 'label' => @label, 'id' => @handler_position } } end #}}} def proto_curl(parameters) #{{{ params = [] callback = Digest::MD5.hexdigest(Kernel::rand().to_s) (parameters[:arguments] || []).each do |s| if s.respond_to?(:mimetype) params << Riddl::Parameter::Complex.new(s.name.to_s,v.mimetype,v.value) else if s.name.to_s =~ /^_Q_/ params << Riddl::Parameter::Simple.new(s.name.to_s.sub(/^_Q_/,''),CPEE::ValueHelper::generate(s.value),:query) elsif s.name.to_s =~ /^_B_/ params << Riddl::Parameter::Simple.new(s.name.to_s.sub(/^_B_/,''),CPEE::ValueHelper::generate(s.value),:body) elsif s.name.to_s =~ /^_H_/ params << Riddl::Header.new(s.name.to_s.sub(/^_H_/,''),CPEE::ValueHelper::generate(s.value)) elsif s.name.to_s =~ /^_C_/ params << Riddl::Parameter::Complex.new(s.name.to_s.sub(/^_C_/,''),*CPEE::ValueHelper::generate(s.value).split(';',2)) else params << Riddl::Parameter::Simple.new(s.name.to_s,CPEE::ValueHelper::generate(s.value)) end end end params << Riddl::Header.new("CPEE-BASE",@controller.base_url) params << Riddl::Header.new("CPEE-INSTANCE",@controller.instance_id) params << Riddl::Header.new("CPEE-INSTANCE-URL",@controller.instance_url) params << Riddl::Header.new("CPEE-INSTANCE-UUID",@controller.uuid) params << Riddl::Header.new("CPEE-CALLBACK",File.join(@controller.instance_url,'callbacks',callback,'/')) params << Riddl::Header.new("CPEE-CALLBACK-ID",callback) params << Riddl::Header.new("CPEE-ACTIVITY",@handler_position) params << Riddl::Header.new("CPEE-LABEL",@label||'') params << Riddl::Header.new("CPEE-TWIN-TARGET",@controller.attributes['twin_target']) if @controller.attributes['twin_target'] @controller.attributes.each do |key,value| params << Riddl::Header.new("CPEE-ATTR-#{key.to_s.gsub(/_/,'-')}",value) end status = result = headers = nil begin tendpoint = @handler_endpoint.sub(/^http(s)?-(get|put|post|delete):/,'http\\1:') type = $2 || parameters[:method] || 'post' client = Riddl::Client.new(tendpoint) @handler_passthrough = callback @controller.callback(self,callback,:'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position) status, result, headers = client.request type => params @guard_files += result if status == 561 if @controller.attributes['twin_translate'] gettrans = Riddl::Client.new(@controller.attributes['twin_translate']) gtstatus, gtresult, gtheaders = gettrans.get if gtstatus >= 200 && gtstatus < 300 transwhat = case headers['CPEE-TWIN-TASKTYPE'] when 'i'; 'instantiation' when 'ir'; 'ipc-receive' when 'is'; 'ipc-send' else 'instantiation' end JSON::parse(gtresult.first.value.read).each do |e| if e['type'] == transwhat @handler_endpoint = e['endpoint'] if e['endpoint'] e['arguments']&.each do |k,a| if a.is_a? String hname = a.gsub(/-/,'_') a = headers[hname] if headers[hname] elsif a.is_a? Hash a.each do |k_ht, a_ht| hname = a_ht.gsub(/-/,'_') a[k_ht] = headers[hname] if headers[hname] end end params.each do |p| if p.name == k if a.is_a? String p.value = a elsif a.is_a? Hash ohash = JSON::parse(p.value) rescue {} ohash.merge!(a) p.value = JSON.generate(ohash) end end end end end end end else @handler_endpoint = @handler_endpoint_orig end params.delete_if { |p| p.name == 'original_endpoint' } end end while status == 561 if status < 200 || status >= 300 headers['CPEE_SALVAGE'] = true c = result[0]&.value c = c.read if c.respond_to? :read callback([ Riddl::Parameter::Complex.new('error','application/json',StringIO.new(JSON::generate({ 'status' => status, 'error' => c }))) ], headers) else if headers['CPEE_CALLBACK'] && headers['CPEE_CALLBACK'] == 'true' && result.any? headers['CPEE_UPDATE'] = true callback result, headers elsif headers['CPEE_CALLBACK'] && headers['CPEE_CALLBACK'] == 'true' && result.empty? if headers['CPEE_INSTANTIATION'] @controller.notify("task/instantiation", :'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => CPEE::ValueHelper.parse(headers['CPEE_INSTANTIATION'])) end if headers['CPEE_EVENT'] @controller.notify("task/#{headers['CPEE_EVENT'].gsub(/[^\w_-]/,'')}", :'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint) end # do nothing, later on things will happend else callback result, headers end end end #}}} def activity_handle(passthrough, parameters) # {{{ raise "Wrong endpoint" if @handler_endpoint.nil? || @handler_endpoint.empty? @label = parameters[:label] @anno = parameters.delete(:annotations) rescue nil @controller.notify("status/resource_utilization", :mib => GetProcessMem.new.mb, **Process.times.to_h) @controller.notify("activity/calling", :'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position, :passthrough => passthrough, :endpoint => @handler_endpoint, :parameters => parameters, :annotations => @anno) if passthrough.to_s.empty? proto_curl parameters else @controller.callback(self,passthrough,:'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position) @handler_passthrough = passthrough end end # }}} def activity_manipulate_handle(parameters) #{{{ @label = parameters[:label] end #}}} def activity_result_value # {{{ @handler_returnValue end # }}} def activity_result_options # {{{ @handler_returnOptions end # }}} def activity_stop # {{{ unless @handler_passthrough.nil? @controller.cancel_callback(@handler_passthrough) end end # }}} def activity_passthrough_value # {{{ @handler_passthrough end # }}} def activity_no_longer_necessary # {{{ true end # }}} def activity_uuid #{{{ @handler_activity_uuid end #}}} def inform_activity_done # {{{ @controller.notify("activity/done", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position) @controller.notify("status/resource_utilization", :mib => GetProcessMem.new.mb, **Process.times.to_h) end # }}} def inform_activity_manipulate # {{{ @controller.notify("activity/manipulating", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position) end # }}} def inform_activity_failed(err) # {{{ puts err.message puts err.backtrace @controller.notify("activity/failed", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :message => err.message, :line => err.backtrace[0].match(/(.*?):(\d+):/)[2], :where => err.backtrace[0].match(/(.*?):(\d+):/)[1]) end # }}} def inform_manipulate_change(status,changed_dataelements,changed_endpoints,dataelements,endpoints) # {{{ unless status.nil? @controller.notify("status/change", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :id => status.id, :message => status.message) end unless changed_dataelements.nil? || changed_dataelements.empty? de = dataelements.slice(*changed_dataelements).transform_values { |v| enc = CPEE::EvalRuby::Translation::detect_encoding(v); (enc == 'OTHER' ? v : (v.encode('UTF-8',enc) rescue CPEE::EvalRuby::Translation::convert_to_base64(v))) } @controller.notify("dataelements/change", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :changed => changed_dataelements, :values => de) end unless changed_endpoints.nil? || changed_endpoints.empty? @controller.notify("endpoints/change", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :changed => changed_endpoints, :values => endpoints.slice(*changed_endpoints)) end end # }}} def vote_sync_after # {{{ @controller.vote("activity/syncing_after", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :activity => @handler_position, :label => @label) end # }}} def vote_sync_before(parameters=nil) # {{{ @controller.vote("activity/syncing_before", :'activity-uuid' => @handler_activity_uuid, :endpoint => @handler_endpoint, :activity => @handler_position, :label => @label, :parameters => parameters) end # }}} def callback(result=nil,options={}) #{{{ status, ret, headers = Riddl::Client.new(@controller.url_result_transformation).request 'put' => result recv = if status >= 200 && status < 300 JSON::parse(ret[0].value.read) else nil end @controller.notify("activity/receiving", :'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => recv, :annotations => @anno) @guard_files += result @guard_files += ret if options['CPEE_INSTANTIATION'] @controller.notify("task/instantiation", :'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => CPEE::ValueHelper.parse(options['CPEE_INSTANTIATION'])) end if options['CPEE_EVENT'] @controller.notify("task/#{options['CPEE_EVENT'].gsub(/[^\w_-]/,'')}", :'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => recv) else @handler_returnValue = recv @handler_returnOptions = options end if options['CPEE_STATUS'] @controller.notify("activity/status", :'activity-uuid' => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :status => options['CPEE_STATUS']) end if options['CPEE_UPDATE'] @handler_continue.continue WEEL::Signal::UpdateAgain else @controller.cancel_callback(@handler_passthrough) @handler_passthrough = nil if options['CPEE_SALVAGE'] @handler_continue.continue WEEL::Signal::Salvage elsif options['CPEE_STOP'] @handler_continue.continue WEEL::Signal::Stop else @handler_continue.continue end end end #}}} def mem_guard() #{{{ @guard_files.delete_if do |p| if p&.respond_to?(:close) p.close elsif p&.value&.respond_to?(:close) p.value.close end true end GC.start end #}}} def prepare(struct, endpoints, parameters) #{{{ @handler_endpoint = endpoints.is_a?(Array) ? endpoints.map{ |ep| struct.endpoints[ep] }.compact : struct.endpoints[endpoints] if @controller.attributes['twin_engine'] @handler_endpoint_orig = @handler_endpoint @handler_endpoint = @controller.attributes['twin_engine'].to_s + '?original_endpoint=' + Riddl::Protocols::Utils::escape(@handler_endpoint) end params = parameters.dup params[:arguments] = params[:arguments].dup if params[:arguments] params[:arguments]&.map! do |ele| t = ele.dup if t.value.is_a?(WEEL::ProcString) send = [] send.push Riddl::Parameter::Simple::new('code',t.value.code) send.push Riddl::Parameter::Complex::new('dataelements','application/json', JSON::generate(struct.data)) send.push Riddl::Parameter::Complex::new('local','application/json', JSON::generate(struct.local)) if struct.local send.push Riddl::Parameter::Complex::new('endpoints','application/json', JSON::generate(struct.endpoints)) send.push Riddl::Parameter::Complex::new('additional','application/json', JSON::generate(struct.additional)) status, ret, headers = Riddl::Client.new(@controller.url_code).request 'put' => send recv = if status >= 200 && status < 300 ret[0].value else nil end t.value = recv end t end params end #}}} def test_condition(dataelements,endpoints,local,additional,code,args={}) #{{{ send = [] send.push Riddl::Parameter::Simple::new('code',code) send.push Riddl::Parameter::Complex::new('dataelements','application/json', JSON::generate(dataelements)) send.push Riddl::Parameter::Complex::new('local','application/json', JSON::generate(local)) if local send.push Riddl::Parameter::Complex::new('endpoints','application/json', JSON::generate(endpoints)) send.push Riddl::Parameter::Complex::new('additional','application/json', JSON::generate(additional)) status, ret, headers = Riddl::Client.new(@controller.url_code).request 'put' => send recv = if status >= 200 && status < 300 ret[0].value else nil end recv = 'false' unless receive recv = (recv == 'false' || recv == 'null' || recv == 'nil' ? false : true) @controller.notify("gateway/decide", :instance_uuid => @controller.uuid, :code => code, :condition => recv) recv end #}}} def manipulate(readonly,lock,dataelements,endpoints,status,local,additional,code,where,result=nil,options=nil) #{{{ lock.synchronize do send = [] send.push Riddl::Parameter::Simple::new('code',code) send.push Riddl::Parameter::Complex::new('dataelements','application/json', JSON::generate(dataelements)) send.push Riddl::Parameter::Complex::new('local','application/json', JSON::generate(local)) if local send.push Riddl::Parameter::Complex::new('endpoints','application/json', JSON::generate(endpoints)) send.push Riddl::Parameter::Complex::new('additional','application/json', JSON::generate(additional)) send.push Riddl::Parameter::Complex::new('status','application/json', JSON::generate(status)) if status send.push Riddl::Parameter::Complex::new('call_result','application/json', JSON::generate(result)) send.push Riddl::Parameter::Complex::new('call_headers','application/json', JSON::generate(options)) stat, ret, headers = Riddl::Client.new(@controller.url_code).request 'put' => send if stat >= 200 && stat < 300 ret.shift # drop result signal = changed_status = nil changed_dataelements = changed_local = changed_endpoints = [] signal = ret.shift.value if ret.any? && ret[0].name == 'signal' changed_dataelements = JSON::parse(ret.shift.value.read) if ret.any? && ret[0].name == 'changed_dataelements' changed_endpoints = JSON::parse(ret.shift.value.read) if ret.any? && ret[0].name == 'changed_endpoints' changed_status = JSON::parse(ret.shift.value.read) if ret.any? && ret[0].name == 'changed_status' struct = if readonly WEEL::ReadStructure.new(dataelements,endpoints,local,additional) else WEEL::ManipulateStructure.new(dataelements, endpoints, status, local, additional) end struct.update(changed_dataelements,changed_endpoints,changed_status) struct else nil end end end #}}} def split_branches(branches) # factual, so for inclusive or [[a],[b],[c,d,e]]{{{ @controller.notify("gateway/split", :instance_uuid => @controller.uuid, :branches => branches) end #}}} def join_branches(branches) # factual, so for inclusive or [[a],[b],[c,d,e]]{{{ @controller.notify("gateway/join", :instance_uuid => @controller.uuid, :branches => branches) end #}}} end