lib/openwfe/expressions/fe_time.rb in openwferu-0.9.3 vs lib/openwfe/expressions/fe_time.rb in openwferu-0.9.4
- old
+ new
@@ -41,10 +41,12 @@
require 'openwfe/rudefinitions'
require 'openwfe/utils'
require 'openwfe/util/otime'
require 'openwfe/util/scheduler'
+require 'openwfe/expressions/timeout'
+require 'openwfe/expressions/fe_condition'
#
# expressions like 'sleep' and 'cron'
#
@@ -69,28 +71,35 @@
# expression
#
def cancel ()
synchronize do
- ldebug { "cancel() @scheduler_job_id is #{@scheduler_job_id}" }
+ ldebug { "cancel()..." }
- get_scheduler.unschedule(@scheduler_job_id) \
- if @scheduler_job_id
+ unschedule()
return super()
end
end
+
+ def unschedule ()
+
+ ldebug { "unschedule() @scheduler_job_id is #{@scheduler_job_id}" }
+
+ get_scheduler.unschedule(@scheduler_job_id) \
+ if @scheduler_job_id
+ end
end
#
# The 'sleep' expression expects one attribute, either 'for', either
# 'until'.
#
- # <sequence>
- # <sleep for="10m12s" />
- # <participant ref="alpha" />
- # </sequence>
+ # <sequence>
+ # <sleep for="10m12s" />
+ # <participant ref="alpha" />
+ # </sequence>
#
# will wait for 10 minutes and 12 seconds before sending a workitem
# to participant 'alpha'.
#
class SleepExpression < TimeExpression
@@ -158,23 +167,35 @@
@scheduler_job_id =
scheduler.schedule_at(@awakening_time, self, nil)
ldebug do
- "[re]schedule() @scheduler_job_id is #{@scheduler_job_id} "+
+ "[re]schedule() @scheduler_job_id is '#{@scheduler_job_id}' "+
" (scheduler #{scheduler.object_id})"
end
store_itself()
end
end
#
+ # Scheduling subprocesses for repeating execution
+ #
# <cron tab="0 9-17 * * mon-fri" name="//reminder">
# <send-reminder/>
# </cron>
#
+ # In this short process definition snippet, the subprocess "send-reminder"
+ # will get triggered once per hour (minute 0) from 0900 to 1700 and
+ # this, from monday to friday.
+ #
+ # The 'name' of the cron indicates also at which level the cron should
+ # be bound. A double slash means the cron is bound at engine level (and
+ # will continue until it is unbound, as long as the engine is up, if the
+ # engine is a persisted one, the cron will continue when the engine
+ # restarts).
+ #
class CronExpression < TimeExpression
attr_accessor \
:raw_child, :tab, :name
@@ -223,10 +244,15 @@
#def cancel ()
#end
#
# implemented in parent TimeExpression class
+ #
+ # This is the method called each time, the scheduler triggers
+ # this cron. The contained segment of process will get
+ # executed.
+ #
def trigger (params)
#
# launch raw child
ldebug { "trigger() cron : #{@fei.to_debug_s}" }
@@ -242,19 +268,167 @@
OpenWFE::exception_to_s($!)
end
end
end
+ #
+ # This method is called at the first schedule of this expression
+ # or each time the engine is restarted and this expression has
+ # to be rescheduled.
+ #
def reschedule (scheduler)
- @scheduler_id = get_scheduler.schedule(@tab, @name, self, nil)
+ @scheduler_job_id = @name.dup
- ldebug { "reschedule() job id is #{@scheduler_id}" }
+ @scheduler_job_id = "#{@fei.wfid}__#{@scheduler_job_id}" \
+ if not OpenWFE::starts_with(@name, "//")
+ get_scheduler.schedule(@tab, @scheduler_job_id, self, nil)
+
+ ldebug { "reschedule() job id is '#{@scheduler_job_id}'" }
+
#store_itself()
#
# done by the containing environment itself
end
+ end
+
+ #
+ # The 'when' expression will trigger a consequence when a condition
+ # is met, like in
+ #
+ # <when test="${variable:over} == true">
+ # <participant ref="toto" />
+ # </when>
+ #
+ # where the participant "toto" will receive a workitem when the (local)
+ # variable "over" has the value true.
+ #
+ class WhenExpression < TimeExpression
+ include ConditionMixin, TimeoutMixin
+
+ attr_accessor \
+ :frequency,
+ :consequence_triggered
+
+ DEFAULT_FREQUENCY = "20s"
+
+ def apply (workitem)
+
+ if @children.size < 1
+ reply_to_parent(workitem)
+ return
+ end
+
+ @applied_workitem = workitem.dup
+
+ @frequency =
+ lookup_attribute(:frequency, workitem, DEFAULT_FREQUENCY)
+ @frequency =
+ OpenWFE::parse_time_string(@frequency)
+
+ determine_timeout()
+
+ @consequence_triggered = false
+
+ trigger(nil)
+ end
+
+ def reply (workitem)
+
+ ldebug do
+ "reply() @consequence_triggered is '#{@consequence_triggered}'"
+ end
+
+ if @consequence_triggered
+ reply_to_parent(workitem)
+ return
+ end
+
+ result = OpenWFE::get_result(workitem)
+
+ if result
+ apply_consequence(workitem)
+ else
+ reschedule(get_scheduler)
+ end
+ end
+
+ def cancel ()
+ to_unschedule()
+ super()
+ end
+
+ def trigger (params)
+
+ ldebug { "trigger() params is #{params}" }
+
+ if params == :do_timeout!
+ #
+ # do timeout...
+ #
+ reply_to_parent(@applied_workitem)
+ return
+ end
+
+ @scheduler_job_id = nil
+
+ evaluate_condition()
+ end
+
+ def reschedule (scheduler)
+
+ @scheduler_job_id =
+ scheduler.schedule_in(@frequency, self, nil)
+
+ ldebug { "reschedule() @scheduler_job_id is #{@scheduler_job_id}" }
+
+ to_reschedule(scheduler)
+ end
+
+ def reply_to_parent (workitem)
+
+ unschedule()
+ unschedule_timeout()
+
+ super(workitem)
+ end
+
+ protected
+
+ def evaluate_condition
+
+ if @children.size > 1
+ #
+ # trigger the evaluation of the first (condition) child
+
+ store_itself()
+
+ get_expression_pool.launch_template(
+ self, @children[0], @applied_workitem)
+ else
+ #
+ # eval the attribute condition immediately
+
+ c = eval_condition(:test, @applied_workitem)
+
+ OpenWFE::set_result(@applied_workitem, c)
+
+ reply(@applied_workitem)
+ end
+ end
+
+ def apply_consequence (workitem)
+
+ @consequence_triggered = true
+
+ store_itself()
+
+ i = 1
+ i = 0 if @children.size == 1
+
+ get_expression_pool.apply(@children[i], workitem)
+ end
end
end