server/handlerwrappers/default.rb in cpee-1.5.27 vs server/handlerwrappers/default.rb in cpee-2.0

- old
+ new

@@ -11,30 +11,39 @@ # You should have received a copy of the GNU General Public License along with # CPEE (file COPYING in the main directory). If not, see # <http://www.gnu.org/licenses/>. class DefaultHandlerWrapper < WEEL::HandlerWrapperBase + def self::loop_guard(arguments,id,count) # {{{ + controller = arguments[0] + tsn = Time.now + tso = controller.loop_guard[id][:timestamp] rescue Time.now + controller.loop_guard[id] = { :count => count, :timestamp => tsn } + # if we have more than 100 loop iterations and the last one took less than 2 seconds, we slow the hell down + tso + 2 > tsn && count > 100 + end # }}} + def self::inform_state_change(arguments,newstate) # {{{ controller = arguments[0] - controller.serialize_state! - controller.notify("state/change", :instance => controller.instance, :instance_uuid => controller.uuid, :state => newstate, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => controller.attributes_translated) - controller.finalize_if_finished + controller.notify("state/change", :state => newstate, :timestamp => Time.now.xmlschema(3)) end # }}} + def self::inform_state_change(arguments,newstate) # {{{ + controller = arguments[0] + controller.notify("state/change", :state => newstate, :timestamp => Time.now.xmlschema(3)) + end # }}} def self::inform_syntax_error(arguments,err,code)# {{{ controller = arguments[0] - controller.notify("description/error", :instance => controller.instance, :instance_uuid => controller.uuid, :message => err.message, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")) + controller.notify("description/error", :message => err.message) end# }}} def self::inform_handlerwrapper_error(arguments,err) # {{{ controller = arguments[0] - controller.notify("handlerwrapper/error", :instance => controller.instance, :instance_uuid => controller.uuid, :message => err.message, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")) + p err.message + p err.backtrace + controller.notify("handlerwrapper/error", :message => err.message) end # }}} def self::inform_position_change(arguments,ipc={}) # {{{ controller = arguments[0] - controller.serialize_positions! - ipc[:instance] = controller.instance - ipc[:instance_uuid] = controller.uuid - ipc[:timestamp] = Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z") controller.notify("position/change", ipc) end # }}} def initialize(arguments,position=nil,continue=nil) # {{{ @controller = arguments[0] @@ -63,14 +72,10 @@ t end params end #}}} - def additional - { :attributes => @controller.attributes_translated } rescue {} - end - def proto_curl(parameters) #{{{ params = [] callback = Digest::MD5.hexdigest(Kernel::rand().to_s) (parameters[:arguments] || []).each do |s| if s.respond_to?(:mimetype) @@ -89,11 +94,11 @@ end end end params << Riddl::Header.new("CPEE-BASE",@controller.base_url) - params << Riddl::Header.new("CPEE-INSTANCE",@controller.instance) + params << Riddl::Header.new("CPEE-INSTANCE",@controller.instance_id) params << Riddl::Header.new("CPEE-INSTANCE-URL",@controller.instance_url) params << Riddl::Header.new("CPEE-INSTANCE-UUID",@controller.uuid) params << Riddl::Header.new("CPEE-CALLBACK",@controller.instance_url + '/callbacks/' + callback) params << Riddl::Header.new("CPEE-CALLBACK-ID",callback) params << Riddl::Header.new("CPEE-ACTIVITY",@handler_position) @@ -106,22 +111,22 @@ tendpoint = @handler_endpoint.sub(/^http(s)?-(get|put|post|delete):/,'http\\1:') type = $2 || parameters[:method] || 'post' client = Riddl::Client.new(tendpoint) - @controller.callbacks[callback] = CPEE::Callback.new("callback activity: #{@handler_position}",self,:callback,nil,nil,:http) + @controller.callback(self,callback,:activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position) @handler_passthrough = callback status, result, headers = client.request type => params if status < 200 || status >= 300 headers['CPEE_SALVAGE'] = true c = result[0]&.value c = c.read if c.respond_to? :read callback([ Riddl::Parameter::Complex.new('error','application/json',StringIO.new(JSON::generate({ 'status' => status, 'error' => c }))) ], headers) else if headers['CPEE_INSTANTIATION'] - @controller.notify("task/instantiation", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :endpoint => @handler_endpoint, :received => CPEE::ValueHelper.parse(headers['CPEE_INSTANTIATION']), :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated) + @controller.notify("task/instantiation", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => CPEE::ValueHelper.parse(headers['CPEE_INSTANTIATION'])) end if headers['CPEE_CALLBACK'] && headers['CPEE_CALLBACK'] == 'true' && result.any? headers['CPEE_UPDATE'] = true callback result, headers elsif headers['CPEE_CALLBACK'] && headers['CPEE_CALLBACK'] == 'true' && result.empty? @@ -136,15 +141,15 @@ raise "Wrong endpoint" if @handler_endpoint.nil? || @handler_endpoint.empty? @label = parameters[:label] @sensors = parameters.dig(:stream,:sensors) @aggregators = parameters.dig(:stream,:aggregators) @costs = parameters.dig(:stream,:costs) - @controller.notify("activity/calling", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :instance_uuid => @controller.uuid, :label => @label, :instance_name => @controller.info, :activity => @handler_position, :passthrough => passthrough, :endpoint => @handler_endpoint, :parameters => parameters, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated) + @controller.notify("activity/calling", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :passthrough => passthrough, :endpoint => @handler_endpoint, :parameters => parameters) if passthrough.to_s.empty? proto_curl parameters else - @controller.callbacks[passthrough] = CPEE::Callback.new("callback activity: #{@handler_position}",self,:callback,nil,nil,:http) + @controller.callback(self,passthrough,:activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position) @handler_passthrough = passthrough end end # }}} def activity_manipulate_handle(parameters) #{{{ @label = parameters[:label] @@ -157,11 +162,11 @@ @handler_returnOptions end # }}} def activity_stop # {{{ unless @handler_passthrough.nil? - @controller.callbacks.delete(@handler_passthrough) + @controller.cancel_callback(@handler_passthrough) end end # }}} def activity_passthrough_value # {{{ @handler_passthrough end # }}} @@ -169,40 +174,37 @@ def activity_no_longer_necessary # {{{ true end # }}} def inform_activity_done # {{{ - @controller.notify("activity/done", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated) + @controller.notify("activity/done", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position) end # }}} def inform_activity_manipulate # {{{ - @controller.notify("activity/manipulating", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated) + @controller.notify("activity/manipulating", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position) end # }}} def inform_activity_failed(err) # {{{ puts err.message puts err.backtrace - @controller.notify("activity/failed", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :message => err.message, :line => err.backtrace[0].match(/(.*?):(\d+):/)[2], :where => err.backtrace[0].match(/(.*?):(\d+):/)[1], :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated) + @controller.notify("activity/failed", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :message => err.message, :line => err.backtrace[0].match(/(.*?):(\d+):/)[2], :where => err.backtrace[0].match(/(.*?):(\d+):/)[1]) end # }}} def inform_manipulate_change(status,changed_dataelements,changed_endpoints,dataelements,endpoints) # {{{ unless status.nil? - @controller.serialize_status! - @controller.notify("status/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :id => status.id, :message => status.message, :attributes => @controller.attributes_translated, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")) + @controller.notify("status/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :id => status.id, :message => status.message) end unless changed_dataelements.nil? - @controller.serialize_dataelements! - @controller.notify("dataelements/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :changed => changed_dataelements, :values => dataelements, :attributes => @controller.attributes_translated, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")) + @controller.notify("dataelements/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :changed => changed_dataelements, :values => dataelements) end unless changed_endpoints.nil? - @controller.serialize_endpoints! - @controller.notify("endpoints/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :changed => changed_endpoints, :values => endpoints, :attributes => @controller.attributes_translated, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")) + @controller.notify("endpoints/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :changed => changed_endpoints, :values => endpoints) end end # }}} def vote_sync_after # {{{ - @controller.call_vote("activity/syncing_after", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")) + @controller.vote("activity/syncing_after", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :activity => @handler_position, :label => @label) end # }}} def vote_sync_before(parameters=nil) # {{{ - @controller.call_vote("activity/syncing_before", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :parameters => parameters, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")) + @controller.vote("activity/syncing_before", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :activity => @handler_position, :label => @label, :parameters => parameters) end # }}} def simplify_result(result) if result.length == 1 if result[0].is_a? Riddl::Parameter::Simple @@ -257,21 +259,21 @@ end end end def callback(result=nil,options={}) - @controller.notify("activity/receiving", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :endpoint => @handler_endpoint, :received => structurize_result(result), :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated, :sensors => @sensors, :aggregators => @aggregators, :costs => @costs) + @controller.notify("activity/receiving", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => structurize_result(result), :sensors => @sensors, :aggregators => @aggregators, :costs => @costs) result = simplify_result(result) @handler_returnValue = result @handler_returnOptions = options if options['CPEE_UPDATE'] if options['CPEE_UPDATE_STATUS'] - @controller.notify("activity/status", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :endpoint => @handler_endpoint, :status => options['CPEE_UPDATE_STATUS'], :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated) + @controller.notify("activity/status", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :status => options['CPEE_UPDATE_STATUS']) end @handler_continue.continue WEEL::Signal::Again else - @controller.callbacks.delete(@handler_passthrough) + @controller.cancel_callback(@handler_passthrough) @handler_passthrough = nil if options['CPEE_SALVAGE'] @handler_continue.continue WEEL::Signal::Salvage else @handler_continue.continue @@ -279,21 +281,21 @@ end end def test_condition(mr,code) res = mr.instance_eval(code) - @controller.notify("condition/eval", :instance => @controller.instance, :instance_uuid => @controller.uuid, :code => code, :condition => (res ? "true" : "false"), :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated) + @controller.notify("condition/eval", :instance_uuid => @controller.uuid, :code => code, :condition => (res ? "true" : "false")) res end def simulate(type,nesting,tid,parent,parameters={}) #{{{ pp "#{type} - #{nesting} - #{tid} - #{parent} - #{parameters.inspect}" - @controller.call_vote("simulating/step", - :endpoint => @handler_endpoint, - :instance => @controller.instance, - :instance_uuid => @controller.uuid, + @controller.vote("simulating/step", + :activity_uuid => @handler_activity_uuid, + :label => @label, :activity => tid, + :endpoint => @handler_endpoint, :type => type, :nesting => nesting, :parent => parent, :parameters => parameters )