server/handlerwrappers/default.rb in cpee-1.5.27 vs server/handlerwrappers/default.rb in cpee-2.0
- old
+ new
@@ -11,30 +11,39 @@
# You should have received a copy of the GNU General Public License along with
# CPEE (file COPYING in the main directory). If not, see
# <http://www.gnu.org/licenses/>.
class DefaultHandlerWrapper < WEEL::HandlerWrapperBase
+ def self::loop_guard(arguments,id,count) # {{{
+ controller = arguments[0]
+ tsn = Time.now
+ tso = controller.loop_guard[id][:timestamp] rescue Time.now
+ controller.loop_guard[id] = { :count => count, :timestamp => tsn }
+ # if we have more than 100 loop iterations and the last one took less than 2 seconds, we slow the hell down
+ tso + 2 > tsn && count > 100
+ end # }}}
+
def self::inform_state_change(arguments,newstate) # {{{
controller = arguments[0]
- controller.serialize_state!
- controller.notify("state/change", :instance => controller.instance, :instance_uuid => controller.uuid, :state => newstate, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => controller.attributes_translated)
- controller.finalize_if_finished
+ controller.notify("state/change", :state => newstate, :timestamp => Time.now.xmlschema(3))
end # }}}
+ def self::inform_state_change(arguments,newstate) # {{{
+ controller = arguments[0]
+ controller.notify("state/change", :state => newstate, :timestamp => Time.now.xmlschema(3))
+ end # }}}
def self::inform_syntax_error(arguments,err,code)# {{{
controller = arguments[0]
- controller.notify("description/error", :instance => controller.instance, :instance_uuid => controller.uuid, :message => err.message, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"))
+ controller.notify("description/error", :message => err.message)
end# }}}
def self::inform_handlerwrapper_error(arguments,err) # {{{
controller = arguments[0]
- controller.notify("handlerwrapper/error", :instance => controller.instance, :instance_uuid => controller.uuid, :message => err.message, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"))
+ p err.message
+ p err.backtrace
+ controller.notify("handlerwrapper/error", :message => err.message)
end # }}}
def self::inform_position_change(arguments,ipc={}) # {{{
controller = arguments[0]
- controller.serialize_positions!
- ipc[:instance] = controller.instance
- ipc[:instance_uuid] = controller.uuid
- ipc[:timestamp] = Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")
controller.notify("position/change", ipc)
end # }}}
def initialize(arguments,position=nil,continue=nil) # {{{
@controller = arguments[0]
@@ -63,14 +72,10 @@
t
end
params
end #}}}
- def additional
- { :attributes => @controller.attributes_translated } rescue {}
- end
-
def proto_curl(parameters) #{{{
params = []
callback = Digest::MD5.hexdigest(Kernel::rand().to_s)
(parameters[:arguments] || []).each do |s|
if s.respond_to?(:mimetype)
@@ -89,11 +94,11 @@
end
end
end
params << Riddl::Header.new("CPEE-BASE",@controller.base_url)
- params << Riddl::Header.new("CPEE-INSTANCE",@controller.instance)
+ params << Riddl::Header.new("CPEE-INSTANCE",@controller.instance_id)
params << Riddl::Header.new("CPEE-INSTANCE-URL",@controller.instance_url)
params << Riddl::Header.new("CPEE-INSTANCE-UUID",@controller.uuid)
params << Riddl::Header.new("CPEE-CALLBACK",@controller.instance_url + '/callbacks/' + callback)
params << Riddl::Header.new("CPEE-CALLBACK-ID",callback)
params << Riddl::Header.new("CPEE-ACTIVITY",@handler_position)
@@ -106,22 +111,22 @@
tendpoint = @handler_endpoint.sub(/^http(s)?-(get|put|post|delete):/,'http\\1:')
type = $2 || parameters[:method] || 'post'
client = Riddl::Client.new(tendpoint)
- @controller.callbacks[callback] = CPEE::Callback.new("callback activity: #{@handler_position}",self,:callback,nil,nil,:http)
+ @controller.callback(self,callback,:activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position)
@handler_passthrough = callback
status, result, headers = client.request type => params
if status < 200 || status >= 300
headers['CPEE_SALVAGE'] = true
c = result[0]&.value
c = c.read if c.respond_to? :read
callback([ Riddl::Parameter::Complex.new('error','application/json',StringIO.new(JSON::generate({ 'status' => status, 'error' => c }))) ], headers)
else
if headers['CPEE_INSTANTIATION']
- @controller.notify("task/instantiation", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :endpoint => @handler_endpoint, :received => CPEE::ValueHelper.parse(headers['CPEE_INSTANTIATION']), :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated)
+ @controller.notify("task/instantiation", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => CPEE::ValueHelper.parse(headers['CPEE_INSTANTIATION']))
end
if headers['CPEE_CALLBACK'] && headers['CPEE_CALLBACK'] == 'true' && result.any?
headers['CPEE_UPDATE'] = true
callback result, headers
elsif headers['CPEE_CALLBACK'] && headers['CPEE_CALLBACK'] == 'true' && result.empty?
@@ -136,15 +141,15 @@
raise "Wrong endpoint" if @handler_endpoint.nil? || @handler_endpoint.empty?
@label = parameters[:label]
@sensors = parameters.dig(:stream,:sensors)
@aggregators = parameters.dig(:stream,:aggregators)
@costs = parameters.dig(:stream,:costs)
- @controller.notify("activity/calling", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :instance_uuid => @controller.uuid, :label => @label, :instance_name => @controller.info, :activity => @handler_position, :passthrough => passthrough, :endpoint => @handler_endpoint, :parameters => parameters, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated)
+ @controller.notify("activity/calling", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :passthrough => passthrough, :endpoint => @handler_endpoint, :parameters => parameters)
if passthrough.to_s.empty?
proto_curl parameters
else
- @controller.callbacks[passthrough] = CPEE::Callback.new("callback activity: #{@handler_position}",self,:callback,nil,nil,:http)
+ @controller.callback(self,passthrough,:activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position)
@handler_passthrough = passthrough
end
end # }}}
def activity_manipulate_handle(parameters) #{{{
@label = parameters[:label]
@@ -157,11 +162,11 @@
@handler_returnOptions
end # }}}
def activity_stop # {{{
unless @handler_passthrough.nil?
- @controller.callbacks.delete(@handler_passthrough)
+ @controller.cancel_callback(@handler_passthrough)
end
end # }}}
def activity_passthrough_value # {{{
@handler_passthrough
end # }}}
@@ -169,40 +174,37 @@
def activity_no_longer_necessary # {{{
true
end # }}}
def inform_activity_done # {{{
- @controller.notify("activity/done", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated)
+ @controller.notify("activity/done", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position)
end # }}}
def inform_activity_manipulate # {{{
- @controller.notify("activity/manipulating", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated)
+ @controller.notify("activity/manipulating", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position)
end # }}}
def inform_activity_failed(err) # {{{
puts err.message
puts err.backtrace
- @controller.notify("activity/failed", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :message => err.message, :line => err.backtrace[0].match(/(.*?):(\d+):/)[2], :where => err.backtrace[0].match(/(.*?):(\d+):/)[1], :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated)
+ @controller.notify("activity/failed", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :message => err.message, :line => err.backtrace[0].match(/(.*?):(\d+):/)[2], :where => err.backtrace[0].match(/(.*?):(\d+):/)[1])
end # }}}
def inform_manipulate_change(status,changed_dataelements,changed_endpoints,dataelements,endpoints) # {{{
unless status.nil?
- @controller.serialize_status!
- @controller.notify("status/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :id => status.id, :message => status.message, :attributes => @controller.attributes_translated, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"))
+ @controller.notify("status/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :id => status.id, :message => status.message)
end
unless changed_dataelements.nil?
- @controller.serialize_dataelements!
- @controller.notify("dataelements/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :changed => changed_dataelements, :values => dataelements, :attributes => @controller.attributes_translated, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"))
+ @controller.notify("dataelements/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :changed => changed_dataelements, :values => dataelements)
end
unless changed_endpoints.nil?
- @controller.serialize_endpoints!
- @controller.notify("endpoints/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :instance_name => @controller.info, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :changed => changed_endpoints, :values => endpoints, :attributes => @controller.attributes_translated, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"))
+ @controller.notify("endpoints/change", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :label => @label, :activity => @handler_position, :changed => changed_endpoints, :values => endpoints)
end
end # }}}
def vote_sync_after # {{{
- @controller.call_vote("activity/syncing_after", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"))
+ @controller.vote("activity/syncing_after", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :activity => @handler_position, :label => @label)
end # }}}
def vote_sync_before(parameters=nil) # {{{
- @controller.call_vote("activity/syncing_before", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :parameters => parameters, :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"))
+ @controller.vote("activity/syncing_before", :activity_uuid => @handler_activity_uuid, :endpoint => @handler_endpoint, :activity => @handler_position, :label => @label, :parameters => parameters)
end # }}}
def simplify_result(result)
if result.length == 1
if result[0].is_a? Riddl::Parameter::Simple
@@ -257,21 +259,21 @@
end
end
end
def callback(result=nil,options={})
- @controller.notify("activity/receiving", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :label => @label, :instance_name => @controller.info, :instance_uuid => @controller.uuid, :activity => @handler_position, :endpoint => @handler_endpoint, :received => structurize_result(result), :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated, :sensors => @sensors, :aggregators => @aggregators, :costs => @costs)
+ @controller.notify("activity/receiving", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :received => structurize_result(result), :sensors => @sensors, :aggregators => @aggregators, :costs => @costs)
result = simplify_result(result)
@handler_returnValue = result
@handler_returnOptions = options
if options['CPEE_UPDATE']
if options['CPEE_UPDATE_STATUS']
- @controller.notify("activity/status", :activity_uuid => @handler_activity_uuid, :instance => @controller.instance, :instance_uuid => @controller.uuid, :activity => @handler_position, :endpoint => @handler_endpoint, :status => options['CPEE_UPDATE_STATUS'], :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated)
+ @controller.notify("activity/status", :activity_uuid => @handler_activity_uuid, :label => @label, :activity => @handler_position, :endpoint => @handler_endpoint, :status => options['CPEE_UPDATE_STATUS'])
end
@handler_continue.continue WEEL::Signal::Again
else
- @controller.callbacks.delete(@handler_passthrough)
+ @controller.cancel_callback(@handler_passthrough)
@handler_passthrough = nil
if options['CPEE_SALVAGE']
@handler_continue.continue WEEL::Signal::Salvage
else
@handler_continue.continue
@@ -279,21 +281,21 @@
end
end
def test_condition(mr,code)
res = mr.instance_eval(code)
- @controller.notify("condition/eval", :instance => @controller.instance, :instance_uuid => @controller.uuid, :code => code, :condition => (res ? "true" : "false"), :timestamp => Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z"), :attributes => @controller.attributes_translated)
+ @controller.notify("condition/eval", :instance_uuid => @controller.uuid, :code => code, :condition => (res ? "true" : "false"))
res
end
def simulate(type,nesting,tid,parent,parameters={}) #{{{
pp "#{type} - #{nesting} - #{tid} - #{parent} - #{parameters.inspect}"
- @controller.call_vote("simulating/step",
- :endpoint => @handler_endpoint,
- :instance => @controller.instance,
- :instance_uuid => @controller.uuid,
+ @controller.vote("simulating/step",
+ :activity_uuid => @handler_activity_uuid,
+ :label => @label,
:activity => tid,
+ :endpoint => @handler_endpoint,
:type => type,
:nesting => nesting,
:parent => parent,
:parameters => parameters
)