#
#--
# Copyright (c) 2006-2007, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# . Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# . Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
# $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
#
require 'openwfe/rudefinitions'
require 'openwfe/utils'
require 'openwfe/util/otime'
require 'openwfe/util/scheduler'
require 'openwfe/expressions/timeout'
require 'openwfe/expressions/fe_condition'
#
# expressions like 'sleep' and 'cron'
#
module OpenWFE
#
# A parent class for CronExpression and SleepExpression, is never
# used directly.
# It contains a simple get_scheduler() method simplifying the scheduler
# localization for and .
#
class TimeExpression < FlowExpression
include Schedulable
attr_accessor \
:applied_workitem,
:scheduler_job_id
#
# Makes sure to cancel any scheduler job associated with this
# expression
#
def cancel ()
synchronize do
ldebug { "cancel()..." }
unschedule()
return super()
end
end
def unschedule ()
ldebug { "unschedule() @scheduler_job_id is #{@scheduler_job_id}" }
get_scheduler.unschedule(@scheduler_job_id) \
if @scheduler_job_id
end
end
#
# The 'sleep' expression expects one attribute, either 'for', either
# 'until'.
#
#
#
#
#
#
# will wait for 10 minutes and 12 seconds before sending a workitem
# to participant 'alpha'.
#
class SleepExpression < TimeExpression
attr_accessor \
:awakening_time
def apply (workitem)
synchronize do
sfor = lookup_attribute(:for, workitem)
suntil = lookup_attribute(:until, workitem)
tuntil = nil
if suntil
tuntil = suntil
elsif sfor
tfor = OpenWFE::parse_time_string(sfor)
ldebug { "apply() tfor is '#{tfor}'" }
tuntil = Time.new.to_f + tfor
end
if not tuntil
reply_to_parent(workitem)
return
end
@awakening_time = tuntil
@applied_workitem = workitem.dup
reschedule(get_scheduler)
end
end
#def reply (workitem)
#end
#
# This is the method called by the Scheduler instance attached to
# the workflow engine when the 'sleep' of this expression is
# over
#
def trigger (params)
ldebug do
"trigger() #{@fei.to_debug_s} waking up (#{Time.new.to_f}) "+
"(scheduler #{get_scheduler.object_id})"
end
reply_to_parent(@applied_workitem)
end
#
# [Re]schedules this expression, effectively registering it within
# the scheduler.
# This method is called when the expression is applied and each
# time the owning engine restarts.
#
def reschedule (scheduler)
ldebug do
"[re]schedule() " +
"will sleep until '#{@awakening_time}' " +
"(#{OpenWFE::to_iso8601_date(@awakening_time)})"
end
@scheduler_job_id =
scheduler.schedule_at(@awakening_time, self, nil)
ldebug do
"[re]schedule() @scheduler_job_id is '#{@scheduler_job_id}' "+
" (scheduler #{scheduler.object_id})"
end
store_itself()
end
end
#
# Scheduling subprocesses for repeating execution
#
#
#
#
#
# In this short process definition snippet, the subprocess "send-reminder"
# will get triggered once per hour (minute 0) from 0900 to 1700 and
# this, from monday to friday.
#
# The 'name' of the cron indicates also at which level the cron should
# be bound. A double slash means the cron is bound at engine level (and
# will continue until it is unbound, as long as the engine is up, if the
# engine is a persisted one, the cron will continue when the engine
# restarts).
#
class CronExpression < TimeExpression
attr_accessor \
:raw_child, :tab, :name
def apply (workitem)
if @children.size < 1
reply_to_parent(workitem)
return
end
@applied_workitem = workitem.dup
@applied_workitem.flow_expression_id = nil
@tab = lookup_attribute(:tab, workitem)
@name = lookup_attribute(:name, workitem)
@raw_child, _fei = get_expression_pool.fetch(@children[0])
@raw_child.parent_id = nil
clean_children()
@children = nil
#
# schedule self
reschedule(get_scheduler)
#
# store self as a variable
# (have to do it after the reschedule, so that the schedule
# info is stored within the variable)
set_variable(@name, self)
#
# resume flow
reply_to_parent(workitem)
end
def reply (workitem)
# discard silently... should never get called though
end
#def cancel ()
#end
#
# implemented in parent TimeExpression class
#
# This is the method called each time, the scheduler triggers
# this cron. The contained segment of process will get
# executed.
#
def trigger (params)
#
# launch raw child
ldebug { "trigger() cron : #{@fei.to_debug_s}" }
@raw_child.application_context = @application_context
begin
get_expression_pool.launch_template(
@fei.wfid, @raw_child, @applied_workitem.dup)
rescue
lerror do
"trigger() cron caught exception\n"+
OpenWFE::exception_to_s($!)
end
end
end
#
# This method is called at the first schedule of this expression
# or each time the engine is restarted and this expression has
# to be rescheduled.
#
def reschedule (scheduler)
@scheduler_job_id = @name.dup
@scheduler_job_id = "#{@fei.wfid}__#{@scheduler_job_id}" \
if not OpenWFE::starts_with(@name, "//")
get_scheduler.schedule(@tab, @scheduler_job_id, self, nil)
ldebug { "reschedule() job id is '#{@scheduler_job_id}'" }
#store_itself()
#
# done by the containing environment itself
end
end
#
# The 'when' expression will trigger a consequence when a condition
# is met, like in
#
#
#
#
#
# where the participant "toto" will receive a workitem when the (local)
# variable "over" has the value true.
#
class WhenExpression < TimeExpression
include ConditionMixin, TimeoutMixin
attr_accessor \
:frequency,
:consequence_triggered
DEFAULT_FREQUENCY = "20s"
def apply (workitem)
if @children.size < 1
reply_to_parent(workitem)
return
end
@applied_workitem = workitem.dup
@frequency =
lookup_attribute(:frequency, workitem, DEFAULT_FREQUENCY)
@frequency =
OpenWFE::parse_time_string(@frequency)
determine_timeout()
@consequence_triggered = false
trigger(nil)
end
def reply (workitem)
ldebug do
"reply() @consequence_triggered is '#{@consequence_triggered}'"
end
if @consequence_triggered
reply_to_parent(workitem)
return
end
result = OpenWFE::get_result(workitem)
if result
apply_consequence(workitem)
else
reschedule(get_scheduler)
end
end
def cancel ()
to_unschedule()
super()
end
def trigger (params)
ldebug { "trigger() params is #{params}" }
if params == :do_timeout!
#
# do timeout...
#
reply_to_parent(@applied_workitem)
return
end
@scheduler_job_id = nil
evaluate_condition()
end
def reschedule (scheduler)
@scheduler_job_id =
scheduler.schedule_in(@frequency, self, nil)
ldebug { "reschedule() @scheduler_job_id is #{@scheduler_job_id}" }
to_reschedule(scheduler)
end
def reply_to_parent (workitem)
unschedule()
unschedule_timeout()
super(workitem)
end
protected
def evaluate_condition
if @children.size > 1
#
# trigger the evaluation of the first (condition) child
store_itself()
get_expression_pool.launch_template(
self, @children[0], @applied_workitem)
else
#
# eval the attribute condition immediately
c = eval_condition(:test, @applied_workitem)
OpenWFE::set_result(@applied_workitem, c)
reply(@applied_workitem)
end
end
def apply_consequence (workitem)
@consequence_triggered = true
store_itself()
i = 1
i = 0 if @children.size == 1
get_expression_pool.apply(@children[i], workitem)
end
end
end