lib/openwfe/engine/status_methods.rb in ruote-0.9.18 vs lib/openwfe/engine/status_methods.rb in ruote-0.9.19
- old
+ new
@@ -37,317 +37,341 @@
# John Mettraux at openwfe.org
#
module OpenWFE
+ #
+ # ProcessStatus represents information about the status of a workflow
+ # process instance.
+ #
+ # The status is mainly a list of expressions and a hash of errors.
+ #
+ # Instances of this class are obtained via Engine.process_status().
+ #
+ class ProcessStatus
+
#
- # ProcessStatus represents information about the status of a workflow
- # process instance.
+ # the String workflow instance id of the Process.
#
- # The status is mainly a list of expressions and a hash of errors.
+ attr_reader :wfid
+
#
- # Instances of this class are obtained via Engine.process_status().
+ # The list of the expressions currently active in the process instance.
#
- class ProcessStatus
+ # For instance, if your process definition is currently in a
+ # concurrence, more than one expressions may be listed here.
+ #
+ attr_reader :expressions
- #
- # the String workflow instance id of the Process.
- #
- attr_reader :wfid
+ #
+ # The list of all the expressions in the process (active or not).
+ #
+ attr_reader :all_expressions
- #
- # The list of the expressions currently active in the process instance.
- #
- # For instance, if your process definition is currently in a
- # concurrence, more than one expressions may be listed here.
- #
- attr_reader :expressions
+ #
+ # A hash whose values are ProcessError instances, the keys
+ # are FlowExpressionId instances (fei) (identifying the expressions
+ # that are concerned with the error)
+ #
+ attr_reader :errors
- #
- # A hash whose values are ProcessError instances, the keys
- # are FlowExpressionId instances (fei) (identifying the expressions
- # that are concerned with the error)
- #
- attr_reader :errors
+ #
+ # The time at which the process got launched.
+ #
+ attr_reader :launch_time
- #
- # The time at which the process got launched.
- #
- attr_reader :launch_time
+ #
+ # The variables hash as set in the process environment (the process
+ # scope).
+ #
+ attr_reader :variables
- #
- # The variables hash as set in the process environment (the process
- # scope).
- #
- attr_reader :variables
+ #
+ # The jobs registered for that process instance in the rufus
+ # scheduler used by the engine.
+ #
+ attr_accessor :scheduled_jobs
- #
- # Is the process currently in pause ?
- #
- attr_accessor :paused
+ #
+ # Is the process currently in pause ?
+ #
+ attr_accessor :paused
- #
- # Builds an empty ProcessStatus instance.
- #
- def initialize
+ #
+ # When was this ProcessStatus instance generated ?
+ #
+ attr_accessor :timestamp
- @wfid = nil
- @expressions = []
- @errors = {}
- @launch_time = nil
- @variables = nil
- end
+ #
+ # Builds an empty ProcessStatus instance.
+ #
+ def initialize
- #
- # Returns the workflow definition name for this process.
- #
- def wfname
+ @wfid = nil
+ @expressions = nil
+ @all_expressions = []
+ @errors = {}
+ @launch_time = nil
+ @variables = nil
+ @scheduled_jobs = nil
+ @paused = false
+ @timestamp = Time.now
- @expressions.first.fei.wfname
- end
+ @all_expressions.extend(RepresentationMixin)
+ end
- alias :workflow_definition_name :wfname
+ #
+ # Returns the workflow definition name for this process.
+ #
+ def wfname
- #
- # Returns the workflow definition revision for this process.
- #
- def wfrevision
+ @expressions.first.fei.wfname
+ end
- @expressions.first.fei.wfrevision
- end
+ alias :workflow_definition_name :wfname
- alias :workflow_definition_revision :wfrevision
+ #
+ # Returns the workflow definition revision for this process.
+ #
+ def wfrevision
- #
- # Returns the count of concurrent branches currently active for
- # this process. The typical 'sequential only' process will
- # have a return value of 1 here.
- #
- def branches
+ @expressions.first.fei.wfrevision
+ end
- @expressions.size
- end
+ alias :workflow_definition_revision :wfrevision
- #
- # Returns the tags currently set in this process.
- #
- def tags
+ #
+ # Returns the count of concurrent branches currently active for
+ # this process. The typical 'sequential only' process will
+ # have a return value of 1 here.
+ #
+ def branches
- return [] unless @variables
+ @expressions.size
+ end
- @variables.keys.select do |k|
- @variables[k].is_a?(OpenWFE::RawExpression::Tag)
- end
- end
+ #
+ # Returns the tags currently set in this process.
+ #
+ def tags
- #
- # Returns true if the process is in pause.
- #
- def paused?
+ return [] unless @variables
- #@expressions.first.paused?
- @paused
- end
+ @variables.keys.select do |k|
+ @variables[k].is_a?(OpenWFE::RawExpression::Tag)
+ end
+ end
- #
- # this method is used by Engine.get_process_status() when
- # it prepares its results.
- #
- def << (item)
+ #
+ # Returns true if the process is in pause.
+ #
+ def paused?
- if item.kind_of?(FlowExpression)
- add_expression item
- else
- add_error item
- end
- end
+ #@expressions.first.paused?
+ @paused
+ end
- #
- # A String representation, handy for debugging, quick viewing.
- #
- def to_s
+ #
+ # this method is used by Engine.get_process_status() when
+ # it prepares its results.
+ #
+ def << (item)
- s = []
+ if item.is_a?(FlowExpression)
- s << "-- #{self.class.name} --"
- s << " wfid : #{@wfid}"
- s << " launch_time : #{launch_time}"
- s << " tags : #{tags.join(", ")}"
- s << " errors : #{@errors.size}"
- s << " paused : #{paused?}"
+ @wfid ||= item.fei.parent_wfid
- s << " expressions :"
- @expressions.each do |fexp|
- s << " #{fexp.fei.to_s}"
- end
+ @variables = item.variables \
+ if item.is_a?(Environment) and item.fei.expid == "0"
- s.join "\n"
- end
+ @launch_time ||= item.apply_time \
+ if item.fei.expid == '0' and item.fei.is_in_parent_process?
- protected
+ @all_expressions << item
- def add_expression (fexp)
+ else
- if fexp.is_a?(Environment)
- @variables = fexp.variables if fexp.fei.expid == "0"
- return
- end
+ @errors[item.fei] = item
+ end
+ end
- @wfid ||= fexp.fei.parent_wfid
+ protected
- @launch_time = fexp.apply_time if fexp.fei.expid == '0'
+ #
+ # Prepares the @expressions instance variable. This method
+ # is only called by the process_status method of the Engine.
+ #
+ def pack_expressions
- exps = @expressions
- @expressions = []
+ @expressions = []
- added = false
- @expressions = exps.collect do |fe|
- if added or fe.fei.wfid != fexp.fei.wfid
- fe
- else
- if OpenWFE::starts_with(fexp.fei.expid, fe.fei.expid)
- added = true
- fexp
- elsif OpenWFE::starts_with(fe.fei.expid, fexp.fei.expid)
- added = true
- fe
- else
- fe
- end
- end
- end
- @expressions << fexp unless added
- end
+ @all_expressions.sort_by { |fe| fe.fei.expid }.each do |fe|
- def add_error (error)
+ next unless fe.apply_time
+ # no Environment or RawExpression instances
- @errors[error.fei] = error
- end
- end
+ @expressions.delete_if { |e| e.fei == fe.parent_id }
+ @expressions << fe
+ end
+ end
+ end
+
+ #
+ # just a nice to_s for the ProcessStatuses hash
+ #
+ module StatusesMixin
+
+ attr_accessor :timestamp
+
#
# Renders a nice, terminal oriented, representation of an
# Engine.get_process_status() result.
#
# You usually directly benefit from this when doing
#
- # puts engine.get_process_status.to_s
+ # puts engine.get_process_status.to_s
#
- def OpenWFE.pretty_print_process_status (ps)
+ def to_s
- # TODO : include launch_time and why is process_id so long ?
+ # TODO : include launch_time and why is process_id so long ?
- s = ""
- s << "process_id | name | rev | brn | err | paused? \n"
- s << "--------------------+-------------------+---------+-----+-----+---------\n"
+ s = ""
+ s << "process_id | name | rev | brn | err | paused? \n"
+ s << "--------------------+-------------------+---------+-----+-----+---------\n"
- ps.keys.sort.each do |wfid|
+ self.keys.sort.each do |wfid|
- status = ps[wfid]
- fexp = status.expressions.first
- ffei = fexp.fei
+ status = self[wfid]
+ fexp = status.expressions.first
+ ffei = fexp.fei
- s << "%-19s" % wfid[0, 19]
- s << " | "
- s << "%-17s" % ffei.workflow_definition_name[0, 17]
- s << " | "
- s << "%-7s" % ffei.workflow_definition_revision[0, 7]
- s << " | "
- s << "%3s" % status.expressions.size.to_s[0, 3]
- s << " | "
- s << "%3s" % status.errors.size.to_s[0, 3]
- s << " | "
- s << "%5s" % status.paused?.to_s
- s << "\n"
- end
- s
+ s << "%-19s" % wfid[0, 19]
+ s << " | "
+ s << "%-17s" % ffei.workflow_definition_name[0, 17]
+ s << " | "
+ s << "%-7s" % ffei.workflow_definition_revision[0, 7]
+ s << " | "
+ s << "%3s" % status.expressions.size.to_s[0, 3]
+ s << " | "
+ s << "%3s" % status.errors.size.to_s[0, 3]
+ s << " | "
+ s << "%5s" % status.paused?.to_s
+ s << "\n"
+ end
+
+ s
end
+ end
+ #
+ # This mixin is only included by the Engine class. It contains all
+ # the methods about ProcessStatus.
+ #
+ # Note : it caches process status to avoid too big a load on the
+ # expression storage, the weeping mecha stays here.
+ #
+ module StatusMethods
+
+ def init_status_cache
+
+ @status_cache = LruHash.new(30)
+ @all_status_cache = nil
+
+ get_expression_pool.add_observer(:all) do |event, *args|
+ fei = args.find { |a| a.is_a?(FlowExpressionId) }
+ @status_cache.delete(fei.wfid) if fei
+ @all_status_cache = nil if fei or event == :launch
+ end
+ end
+
#
- # This mixin is only included by the Engine class. It contains all
- # the methods about ProcessStatus.
+ # Returns a hash of ProcessStatus instances. The keys of the hash
+ # are workflow instance ids.
#
- module StatusMethods
+ # A ProcessStatus is a description of the state of a process instance.
+ # It enumerates the expressions where the process is currently
+ # located (waiting certainly) and the errors the process currently
+ # has (hopefully none).
+ #
+ # the :wfid_prefix option is useful when you want to list all the process
+ # for a year (:wfid_prefix => '2007'), a month (:wfid_prefix => '200705') or
+ # a day (:wfid_prefix => '20070529').
+ #
+ def process_statuses (options={})
- #
- # Returns a hash of ProcessStatus instances. The keys of the hash
- # are workflow instance ids.
- #
- # A ProcessStatus is a description of the state of a process instance.
- # It enumerates the expressions where the process is currently
- # located (waiting certainly) and the errors the process currently
- # has (hopefully none).
- #
- def process_statuses (options={})
+ return @all_status_cache if options == {} and @all_status_cache
- options = { :wfid_prefix => options } if options.is_a?(String)
+ init_status_cache unless @status_cache
- result = {}
+ options = { :wfid_prefix => options } if options.is_a?(String)
- expressions = get_expression_storage.find_expressions options
+ expressions = get_expression_storage.find_expressions(options)
- expressions.each do |fexp|
+ result = expressions.inject({}) do |r, fe|
- next unless (fexp.apply_time or fexp.is_a?(Environment))
+ #next unless (fe.apply_time or fe.is_a?(Environment))
+ #next if fe.fei.wfid == '0' # skip the engine env
- next if fexp.fei.wfid == "0" # skip the engine env
+ (r[fe.fei.parent_wfid] ||= ProcessStatus.new) << fe
+ r
+ end
- #(result[fexp.fei.parent_wfid] ||= ProcessStatus.new) << fexp
+ result.values.each do |ps|
- parent_wfid = fexp.fei.parent_wfid
+ ps.paused = (get_expool.paused_instances[ps.wfid] != nil)
- ps = result[parent_wfid]
+ get_error_journal.get_error_log(ps.wfid).each { |er| ps << er }
- if not ps
+ ps.send :pack_expressions # letting it protected
- ps = ProcessStatus.new
+ ps.scheduled_jobs = get_scheduler.find_jobs(ps.wfid)
- ps.paused =
- (get_expool.paused_instances[parent_wfid] != nil)
+ if ps.expressions.size == 0
+ # drop result if there are no expressions
+ result.delete(ps.wfid)
+ @status_cache.delete(ps.wfid)
+ else
+ @status_cache[ps.wfid] = ps
+ end
+ end
- result[parent_wfid] = ps
- end
+ #
+ # done
- ps << fexp
- end
+ result.extend(StatusesMixin)
+ result.timestamp = Time.now
- result.values.each do |ps|
- get_error_journal.get_error_log(ps.wfid).each do |error|
- ps << error
- end
- end
+ @all_status_cache = result
+ # cache and return
+ end
- class << result
- def to_s
- OpenWFE::pretty_print_process_status(self)
- end
- end
+ #
+ # list_process_status() will be deprecated at release 1.0.0
+ #
+ alias :list_process_status :process_statuses
- result
- end
+ #
+ # Returns the process status of one given process instance.
+ #
+ def process_status (wfid)
- #
- # list_process_status() will be deprecated at release 1.0.0
- #
- alias :list_process_status :process_statuses
+ init_status_cache unless @status_cache
- #
- # Returns the process status of one given process instance.
- #
- def process_status (wfid)
+ (r = @status_cache[wfid]) and return r
- wfid = extract_wfid wfid, true
+ wfid = extract_wfid(wfid, true)
- process_statuses(:wfid => wfid).values[0]
- end
+ process_statuses(:wfid_prefix => wfid).values.first
+ end
- #
- # Returns true if the process is in pause.
- #
- def is_paused? (wfid)
+ #
+ # Returns true if the process is in pause.
+ #
+ def is_paused? (wfid)
- (get_expression_pool.paused_instances[wfid] != nil)
- end
+ (get_expression_pool.paused_instances[wfid] != nil)
end
+ end
end