# #-- # Copyright (c) 2006-2007, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # John Mettraux at openwfe.org # require 'openwfe/rudefinitions' require 'openwfe/utils' require 'openwfe/util/otime' require 'openwfe/util/scheduler' require 'openwfe/expressions/timeout' require 'openwfe/expressions/condition' # # expressions like 'sleep' and 'cron' # module OpenWFE # # A parent class for CronExpression and SleepExpression, is never # used directly. # It contains a simple get_scheduler() method simplifying the scheduler # localization for and . # class TimeExpression < FlowExpression include Schedulable attr_accessor \ :applied_workitem, :scheduler_job_id # # Makes sure to cancel any scheduler job associated with this # expression # def cancel () synchronize do ldebug { "cancel()..." } unschedule() return super() end end def unschedule () ldebug { "unschedule() @scheduler_job_id is #{@scheduler_job_id}" } get_scheduler.unschedule(@scheduler_job_id) \ if @scheduler_job_id end end # # The 'sleep' expression expects one attribute, either 'for', either # 'until'. # # # # # # # will wait for 10 minutes and 12 seconds before sending a workitem # to participant 'alpha'. # class SleepExpression < TimeExpression attr_accessor \ :awakening_time def apply (workitem) #synchronize do sfor = lookup_attribute(:for, workitem) suntil = lookup_attribute(:until, workitem) tuntil = nil if suntil tuntil = suntil elsif sfor tfor = OpenWFE::parse_time_string(sfor) ldebug { "apply() tfor is '#{tfor}'" } tuntil = Time.new.to_f + tfor end if not tuntil reply_to_parent(workitem) return end @awakening_time = tuntil @applied_workitem = workitem.dup reschedule(get_scheduler) #end end #def reply (workitem) #end # # This is the method called by the Scheduler instance attached to # the workflow engine when the 'sleep' of this expression is # over # def trigger (params) ldebug do "trigger() #{@fei.to_debug_s} waking up (#{Time.new.to_f}) "+ "(scheduler #{get_scheduler.object_id})" end reply_to_parent(@applied_workitem) end # # [Re]schedules this expression, effectively registering it within # the scheduler. # This method is called when the expression is applied and each # time the owning engine restarts. # def reschedule (scheduler) return unless @awakening_time ldebug do "[re]schedule() " + "will sleep until '#{@awakening_time}' " + "(#{OpenWFE::to_iso8601_date(@awakening_time)})" end @scheduler_job_id = scheduler.schedule_at(@awakening_time, self, nil) ldebug do "[re]schedule() @scheduler_job_id is '#{@scheduler_job_id}' "+ " (scheduler #{scheduler.object_id})" end store_itself() end end # # Scheduling subprocesses for repeating execution # # # # # # In this short process definition snippet, the subprocess "send-reminder" # will get triggered once per hour (minute 0) from 0900 to 1700 and # this, from monday to friday. # # The 'name' of the cron indicates also at which level the cron should # be bound. A double slash means the cron is bound at engine level (and # will continue until it is unbound, as long as the engine is up, if the # engine is a persisted one, the cron will continue when the engine # restarts). # class CronExpression < TimeExpression attr_accessor \ :raw_child, :tab, :name, :counter def apply (workitem) @counter = 0 if @children.size < 1 reply_to_parent(workitem) return end @applied_workitem = workitem.dup @applied_workitem.flow_expression_id = nil @tab = lookup_attribute(:tab, workitem) @name = lookup_attribute(:name, workitem) @raw_child, _fei = get_expression_pool.fetch(@children[0]) @raw_child.parent_id = nil clean_children() @children = nil # # schedule self reschedule(get_scheduler) # # store self as a variable # (have to do it after the reschedule, so that the schedule # info is stored within the variable) set_variable(@name, self) if @name # # resume flow reply_to_parent(workitem) end def reply (workitem) # discard silently... should never get called though end #def cancel () #end # # implemented in parent TimeExpression class # # This is the method called each time, the scheduler triggers # this cron. The contained segment of process will get # executed. # def trigger (params) # # launch raw child ldebug { "trigger() cron : #{@fei.to_debug_s}" } @raw_child.application_context = @application_context begin get_expression_pool.launch_template( @fei.wfid, @counter, @raw_child, @applied_workitem.dup) # # update count and store self @counter += 1 set_variable(@name, self) if @name rescue lerror do "trigger() cron caught exception\n"+ OpenWFE::exception_to_s($!) end end end # # This method is called at the first schedule of this expression # or each time the engine is restarted and this expression has # to be rescheduled. # def reschedule (scheduler) #return unless @applied_workitem @scheduler_job_id = @name.dup @scheduler_job_id = "#{@fei.wfid}__#{@scheduler_job_id}" \ if not OpenWFE::starts_with(@name, "//") get_scheduler.schedule(@tab, @scheduler_job_id, self, nil) ldebug { "reschedule() job id is '#{@scheduler_job_id}'" } #store_itself() # # done by the containing environment itself end end # # The 'when' expression will trigger a consequence when a condition # is met, like in # # # # # # where the participant "toto" will receive a workitem when the (local) # variable "over" has the value true. # class WhenExpression < TimeExpression include ConditionMixin, TimeoutMixin attr_accessor \ :frequency, :consequence_triggered DEFAULT_FREQUENCY = "20s" def apply (workitem) remove_timedout_flag(workitem) if @children.size < 1 reply_to_parent(workitem) return end @applied_workitem = workitem.dup @frequency = lookup_attribute(:frequency, workitem, DEFAULT_FREQUENCY) @frequency = OpenWFE::parse_time_string(@frequency) determine_timeout() @consequence_triggered = false trigger(nil) end def reply (workitem) ldebug do "reply() @consequence_triggered is '#{@consequence_triggered}'" end if @consequence_triggered reply_to_parent(workitem) return end result = OpenWFE::get_result(workitem) if result apply_consequence(workitem) else reschedule(get_scheduler) end end def cancel () to_unschedule() super() end def trigger (params) ldebug { "trigger() params is #{params}" } if params == :do_timeout! # # do timeout... # set_timedout_flag(@applied_workitem) reply_to_parent(@applied_workitem) return end @scheduler_job_id = nil evaluate_condition() end def reschedule (scheduler) #return unless @applied_workitem @scheduler_job_id = scheduler.schedule_in(@frequency, self, nil) ldebug { "reschedule() @scheduler_job_id is #{@scheduler_job_id}" } to_reschedule(scheduler) end def reply_to_parent (workitem) unschedule() unschedule_timeout() super(workitem) end protected def evaluate_condition if @children.size > 1 # # trigger the evaluation of the first (condition) child store_itself() get_expression_pool.launch_template( self, @children[0], @applied_workitem) else # # eval the attribute condition immediately c = eval_condition(:test, @applied_workitem) OpenWFE::set_result(@applied_workitem, c) reply(@applied_workitem) end end def apply_consequence (workitem) @consequence_triggered = true store_itself() i = 1 i = 0 if @children.size == 1 get_expression_pool.apply(@children[i], workitem) end end end