lib/openwfe/expressions/fe_time.rb in openwferu-0.9.3 vs lib/openwfe/expressions/fe_time.rb in openwferu-0.9.4

- old
+ new

@@ -41,10 +41,12 @@ require 'openwfe/rudefinitions' require 'openwfe/utils' require 'openwfe/util/otime' require 'openwfe/util/scheduler' +require 'openwfe/expressions/timeout' +require 'openwfe/expressions/fe_condition' # # expressions like 'sleep' and 'cron' # @@ -69,28 +71,35 @@ # expression # def cancel () synchronize do - ldebug { "cancel() @scheduler_job_id is #{@scheduler_job_id}" } + ldebug { "cancel()..." } - get_scheduler.unschedule(@scheduler_job_id) \ - if @scheduler_job_id + unschedule() return super() end end + + def unschedule () + + ldebug { "unschedule() @scheduler_job_id is #{@scheduler_job_id}" } + + get_scheduler.unschedule(@scheduler_job_id) \ + if @scheduler_job_id + end end # # The 'sleep' expression expects one attribute, either 'for', either # 'until'. # - # <sequence> - # <sleep for="10m12s" /> - # <participant ref="alpha" /> - # </sequence> + # <sequence> + # <sleep for="10m12s" /> + # <participant ref="alpha" /> + # </sequence> # # will wait for 10 minutes and 12 seconds before sending a workitem # to participant 'alpha'. # class SleepExpression < TimeExpression @@ -158,23 +167,35 @@ @scheduler_job_id = scheduler.schedule_at(@awakening_time, self, nil) ldebug do - "[re]schedule() @scheduler_job_id is #{@scheduler_job_id} "+ + "[re]schedule() @scheduler_job_id is '#{@scheduler_job_id}' "+ " (scheduler #{scheduler.object_id})" end store_itself() end end # + # Scheduling subprocesses for repeating execution + # # <cron tab="0 9-17 * * mon-fri" name="//reminder"> # <send-reminder/> # </cron> # + # In this short process definition snippet, the subprocess "send-reminder" + # will get triggered once per hour (minute 0) from 0900 to 1700 and + # this, from monday to friday. + # + # The 'name' of the cron indicates also at which level the cron should + # be bound. A double slash means the cron is bound at engine level (and + # will continue until it is unbound, as long as the engine is up, if the + # engine is a persisted one, the cron will continue when the engine + # restarts). + # class CronExpression < TimeExpression attr_accessor \ :raw_child, :tab, :name @@ -223,10 +244,15 @@ #def cancel () #end # # implemented in parent TimeExpression class + # + # This is the method called each time, the scheduler triggers + # this cron. The contained segment of process will get + # executed. + # def trigger (params) # # launch raw child ldebug { "trigger() cron : #{@fei.to_debug_s}" } @@ -242,19 +268,167 @@ OpenWFE::exception_to_s($!) end end end + # + # This method is called at the first schedule of this expression + # or each time the engine is restarted and this expression has + # to be rescheduled. + # def reschedule (scheduler) - @scheduler_id = get_scheduler.schedule(@tab, @name, self, nil) + @scheduler_job_id = @name.dup - ldebug { "reschedule() job id is #{@scheduler_id}" } + @scheduler_job_id = "#{@fei.wfid}__#{@scheduler_job_id}" \ + if not OpenWFE::starts_with(@name, "//") + get_scheduler.schedule(@tab, @scheduler_job_id, self, nil) + + ldebug { "reschedule() job id is '#{@scheduler_job_id}'" } + #store_itself() # # done by the containing environment itself end + end + + # + # The 'when' expression will trigger a consequence when a condition + # is met, like in + # + # <when test="${variable:over} == true"> + # <participant ref="toto" /> + # </when> + # + # where the participant "toto" will receive a workitem when the (local) + # variable "over" has the value true. + # + class WhenExpression < TimeExpression + include ConditionMixin, TimeoutMixin + + attr_accessor \ + :frequency, + :consequence_triggered + + DEFAULT_FREQUENCY = "20s" + + def apply (workitem) + + if @children.size < 1 + reply_to_parent(workitem) + return + end + + @applied_workitem = workitem.dup + + @frequency = + lookup_attribute(:frequency, workitem, DEFAULT_FREQUENCY) + @frequency = + OpenWFE::parse_time_string(@frequency) + + determine_timeout() + + @consequence_triggered = false + + trigger(nil) + end + + def reply (workitem) + + ldebug do + "reply() @consequence_triggered is '#{@consequence_triggered}'" + end + + if @consequence_triggered + reply_to_parent(workitem) + return + end + + result = OpenWFE::get_result(workitem) + + if result + apply_consequence(workitem) + else + reschedule(get_scheduler) + end + end + + def cancel () + to_unschedule() + super() + end + + def trigger (params) + + ldebug { "trigger() params is #{params}" } + + if params == :do_timeout! + # + # do timeout... + # + reply_to_parent(@applied_workitem) + return + end + + @scheduler_job_id = nil + + evaluate_condition() + end + + def reschedule (scheduler) + + @scheduler_job_id = + scheduler.schedule_in(@frequency, self, nil) + + ldebug { "reschedule() @scheduler_job_id is #{@scheduler_job_id}" } + + to_reschedule(scheduler) + end + + def reply_to_parent (workitem) + + unschedule() + unschedule_timeout() + + super(workitem) + end + + protected + + def evaluate_condition + + if @children.size > 1 + # + # trigger the evaluation of the first (condition) child + + store_itself() + + get_expression_pool.launch_template( + self, @children[0], @applied_workitem) + else + # + # eval the attribute condition immediately + + c = eval_condition(:test, @applied_workitem) + + OpenWFE::set_result(@applied_workitem, c) + + reply(@applied_workitem) + end + end + + def apply_consequence (workitem) + + @consequence_triggered = true + + store_itself() + + i = 1 + i = 0 if @children.size == 1 + + get_expression_pool.apply(@children[i], workitem) + end end end