lib/ruote/exp/fe_participant.rb in ruote-2.2.0 vs lib/ruote/exp/fe_participant.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -115,11 +115,11 @@
#
class ParticipantExpression < FlowExpression
names :participant
- # Should return true when the dispatch was successful.
+ # Should yield true when the dispatch was successful.
#
h_reader :dispatched
h_reader :participant
@@ -129,36 +129,48 @@
# determine participant
h.participant_name = (attribute(:ref) || attribute_text).to_s
raise ArgumentError.new(
- "no participant name specified"
+ 'no participant name specified'
) if h.participant_name == ''
- participant_info =
- h.participant ||
+ h.participant ||=
@context.plist.lookup_info(h.participant_name, h.applied_workitem)
- unless participant_info.respond_to?(:consume)
- h.participant = participant_info
- end
-
raise(ArgumentError.new(
"no participant named #{h.participant_name.inspect}")
- ) if participant_info.nil?
+ ) if h.participant.nil?
#
- # participant found, consider timeout
+ # trigger on_apply if the participant sports it
- schedule_timeout(participant_info)
+ pa = @context.plist.instantiate(
+ h.participant, :if_respond_to? => :on_apply)
+ Ruote.participant_send(
+ pa, :on_apply, 'workitem' => Ruote::Workitem.new(h.applied_workitem)
+ ) if pa
+
#
# dispatch to participant
h.applied_workitem['participant_name'] = h.participant_name
+
h.applied_workitem['fields']['params'] = compile_atts
+ h.applied_workitem['fields'].delete('t')
+ h.applied_workitem['fields'].delete('__result__')
+
+ h.applied_workitem['re_dispatch_count'] = 0
+
+ if tree.last.any?
+ h.applied_workitem['fields']['params']['__children__'] = dsub(tree.last)
+ end
+
+ consider_participant_timers(h.participant)
+
persist_or_raise
@context.storage.put_msg(
'dispatch',
'fei' => h.fei,
@@ -167,10 +179,12 @@
'workitem' => h.applied_workitem)
end
def cancel(flavour)
+ cancel_flanks(flavour)
+
return reply_to_parent(h.applied_workitem) unless h.participant_name
# no participant, reply immediately
do_persist || return
#
@@ -192,11 +206,13 @@
h.participant ||
@context.plist.lookup_info(h.participant_name, workitem)
pa = @context.plist.instantiate(pinfo, :if_respond_to? => :on_reply)
- pa.on_reply(Ruote::Workitem.new(workitem)) if pa
+ Ruote.participant_send(
+ pa, :on_reply, 'workitem' => Ruote::Workitem.new(workitem)
+ ) if pa
super(workitem)
end
def reply_to_parent(workitem)
@@ -220,35 +236,81 @@
h.dispatched = true
do_persist
# let's not care if it fails...
end
- # Overriden with an empty behaviour. The work is now done a bit later
- # via the #schedule_timeout method.
- #
- def consider_timeout
- end
-
# Determines and schedules timeout if any.
#
# Note that process definition timeout has priority over participant
# specified timeout.
#
- def schedule_timeout(p_info)
+ def consider_participant_timers(p_info)
- timeout = attribute(:timeout)
+ return if h.has_timers
+ # process definition takes precedence over participant defined timers.
- unless timeout
+ timers = nil
- pa = @context.plist.instantiate(p_info, :if_respond_to? => :rtimeout)
+ [ :rtimers, :timers, :rtimeout ].each do |meth|
- timeout = (pa.method(:rtimeout).arity == 0 ?
- pa.rtimeout :
- pa.rtimeout(Ruote::Workitem.new(h.applied_workitem))
- ) if pa
+ pa = @context.plist.instantiate(p_info, :if_respond_to? => meth)
+
+ next unless pa
+
+ timers = Ruote.participant_send(
+ pa, meth, 'workitem' => Ruote::Workitem.new(h.applied_workitem))
+
+ break if timers
end
- do_schedule_timeout(timeout)
+ return unless timers
+
+ timers = if timers.index(':')
+ timers.split(/,/)
+ else
+ [ "#{timers}: timeout" ]
+ end
+
+ schedule_timers(timers)
+ end
+
+ def do_pause(msg)
+
+ return if h.state != nil
+
+ h['state'] = 'paused'
+ h['breakpoint'] = true if msg['breakpoint']
+
+ do_persist || return
+
+ @context.storage.put_msg(
+ 'dispatch_pause',
+ 'fei' => h.fei,
+ 'participant_name' => h.participant_name,
+ 'participant' => h.participant
+ ) unless msg['breakpoint']
+ end
+
+ def do_resume(msg)
+
+ return if h.state != 'paused'
+
+ h['state'] = nil
+ replies = h.delete('paused_replies') || []
+
+ do_persist || return
+
+ if replies.empty?
+ @context.storage.put_msg(
+ 'dispatch_resume',
+ 'fei' => h.fei,
+ 'participant_name' => h.participant_name,
+ 'participant' => h.participant
+ ) unless h['breakpoint']
+ else
+ replies.each { |m| @context.storage.put_msg(m.delete('action'), m) }
+ # trigger replies
+ end
end
end
end