#--
# Copyright (c) 2006-2009, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Made in Japan.
#++
require 'rufus/otime'
require 'openwfe/expressions/timeout'
module OpenWFE
#
# A parent class for CronExpression and SleepExpression, is never
# used directly.
# It contains a simple get_scheduler() method simplifying the scheduler
# localization for and .
#
class TimeExpression < FlowExpression
include Rufus::Schedulable
#
# The workitem received at apply time
#
attr_accessor :applied_workitem
#
# The job_id in the scheduler for this expression
#
attr_accessor :scheduler_job_id
#
# The tags (if any) for the job in the scheduler
#
attr_accessor :scheduler_tags
#
# Makes sure to cancel any scheduler job associated with this
# expression
#
def cancel
unschedule
super()
@applied_workitem
end
#
# If the expression has been scheduled, a call to this method
# will make sure it's unscheduled (removed from the scheduler).
#
def unschedule
ldebug { "unschedule() @scheduler_job_id is #{@scheduler_job_id}" }
sleep get_scheduler.precision + 0.001
#
# make sure not to unschedule before the actual scheduling
# got done.
get_scheduler.unschedule(@scheduler_job_id) \
if @scheduler_job_id
end
protected
#
# looks up potential scheduler tags in the expression
# attributes
#
def determine_scheduler_tags
@scheduler_tags = lookup_array_attribute(
:scheduler_tags, @applied_workitem) || []
@scheduler_tags << self.class.name
@scheduler_tags << fei.to_short_s
@scheduler_tags << fei.parent_wfid
end
end
#
# A parent class for WhenExpression and WaitExpression.
#
# All the code for managing waiting for something to occur is
# concentrated here.
#
class WaitingExpression < TimeExpression
include ConditionMixin
include TimeoutMixin
attr_accessor :frequency
#uses_template
#
# By default, classes extending this class do poll for their
# condition every 10 seconds.
#
DEFAULT_FREQUENCY = '10s'
#
# Don't go under 300 milliseconds.
#
MIN_FREQUENCY = 0.300
#
# Classes extending this WaitingExpression have a 'conditions' class
# method (like 'attr_accessor').
#
def self.conditions (*attnames)
attnames = attnames.collect do |n|
n.to_s.to_sym
end
meta_def :condition_attributes do
attnames
end
end
def apply (workitem)
remove_timedout_flag workitem
@applied_workitem = workitem.dup
@frequency = lookup_attribute(
:frequency, workitem, :default => DEFAULT_FREQUENCY)
@frequency = Rufus::parse_time_string(
@frequency)
@frequency = MIN_FREQUENCY \
if @frequency < MIN_FREQUENCY
determine_timeout(workitem)
determine_scheduler_tags
condition_attribute = determine_condition_attribute(
self.class.condition_attributes)
#
# register consequence
consequence = condition_attribute ?
raw_children[0] : raw_children[1]
consequence = nil if consequence.is_a?(String)
get_expression_pool.tprepare_child(
self,
consequence,
0, # sub_id
:register_child => true,
:dont_store_parent => true
) if consequence
#
# go east...
store_itself
# as :dont_store_parent => true ...
trigger
end
def reply (workitem)
result = workitem.get_result
if result
apply_consequence(workitem)
else
reschedule(get_scheduler)
end
end
#
# Cancels this expression (takes care of unscheduling a timeout
# if there is one).
#
def cancel
unschedule_timeout(nil)
super()
end
def trigger (params={})
ldebug { "trigger() #{@fei.to_debug_s} params : #{params.inspect}" }
if params[:do_timeout!]
#
# do timeout...
#
set_timedout_flag(@applied_workitem)
reply_to_parent(@applied_workitem)
return
end
@scheduler_job_id = nil
evaluate_condition
end
def reschedule (scheduler)
@scheduler_job_id = "waiting_#{fei.to_s}"
scheduler.schedule_in(
@frequency,
{
:schedulable => self,
:job_id => @scheduler_job_id,
:tags => @scheduler_tags })
ldebug { "reschedule() @scheduler_job_id is #{@scheduler_job_id}" }
to_reschedule scheduler
end
def reply_to_parent (workitem)
unschedule
unschedule_timeout(workitem)
super(workitem)
end
protected
#
# The code for the condition evalution is here.
#
# This method is overriden by the WhenExpression.
#
def evaluate_condition
condition_attribute = determine_condition_attribute(
self.class.condition_attributes)
if condition_attribute
c = eval_condition(condition_attribute, @applied_workitem)
do_reply(c)
return
end
# else, condition is nested as a child
if has_no_expression_child
#
# no condition attribute and no child attribute,
# simply reply to parent
#
reply_to_parent(@applied_workitem)
return
end
# trigger the first child (the consequence child)
get_expression_pool.tlaunch_child(
self,
raw_children.first,
(Time.new.to_f * 1000).to_i,
@applied_workitem.dup,
:register_child => false)
end
#
# Used when replying to self after an attribute condition
# got evaluated
#
def do_reply (result)
@applied_workitem.set_result(result)
reply(@applied_workitem)
end
#
# This method is overriden by WhenExpression. WaitExpression
# doesn't override it.
# This default implementation simply directly replies to
# the parent expression.
#
def apply_consequence (workitem)
reply_to_parent(workitem)
end
end
end