lib/openwfe/engine/status_methods.rb in ruote-0.9.18 vs lib/openwfe/engine/status_methods.rb in ruote-0.9.19

- old
+ new

@@ -37,317 +37,341 @@ # John Mettraux at openwfe.org # module OpenWFE + # + # ProcessStatus represents information about the status of a workflow + # process instance. + # + # The status is mainly a list of expressions and a hash of errors. + # + # Instances of this class are obtained via Engine.process_status(). + # + class ProcessStatus + # - # ProcessStatus represents information about the status of a workflow - # process instance. + # the String workflow instance id of the Process. # - # The status is mainly a list of expressions and a hash of errors. + attr_reader :wfid + # - # Instances of this class are obtained via Engine.process_status(). + # The list of the expressions currently active in the process instance. # - class ProcessStatus + # For instance, if your process definition is currently in a + # concurrence, more than one expressions may be listed here. + # + attr_reader :expressions - # - # the String workflow instance id of the Process. - # - attr_reader :wfid + # + # The list of all the expressions in the process (active or not). + # + attr_reader :all_expressions - # - # The list of the expressions currently active in the process instance. - # - # For instance, if your process definition is currently in a - # concurrence, more than one expressions may be listed here. - # - attr_reader :expressions + # + # A hash whose values are ProcessError instances, the keys + # are FlowExpressionId instances (fei) (identifying the expressions + # that are concerned with the error) + # + attr_reader :errors - # - # A hash whose values are ProcessError instances, the keys - # are FlowExpressionId instances (fei) (identifying the expressions - # that are concerned with the error) - # - attr_reader :errors + # + # The time at which the process got launched. + # + attr_reader :launch_time - # - # The time at which the process got launched. - # - attr_reader :launch_time + # + # The variables hash as set in the process environment (the process + # scope). + # + attr_reader :variables - # - # The variables hash as set in the process environment (the process - # scope). - # - attr_reader :variables + # + # The jobs registered for that process instance in the rufus + # scheduler used by the engine. + # + attr_accessor :scheduled_jobs - # - # Is the process currently in pause ? - # - attr_accessor :paused + # + # Is the process currently in pause ? + # + attr_accessor :paused - # - # Builds an empty ProcessStatus instance. - # - def initialize + # + # When was this ProcessStatus instance generated ? + # + attr_accessor :timestamp - @wfid = nil - @expressions = [] - @errors = {} - @launch_time = nil - @variables = nil - end + # + # Builds an empty ProcessStatus instance. + # + def initialize - # - # Returns the workflow definition name for this process. - # - def wfname + @wfid = nil + @expressions = nil + @all_expressions = [] + @errors = {} + @launch_time = nil + @variables = nil + @scheduled_jobs = nil + @paused = false + @timestamp = Time.now - @expressions.first.fei.wfname - end + @all_expressions.extend(RepresentationMixin) + end - alias :workflow_definition_name :wfname + # + # Returns the workflow definition name for this process. + # + def wfname - # - # Returns the workflow definition revision for this process. - # - def wfrevision + @expressions.first.fei.wfname + end - @expressions.first.fei.wfrevision - end + alias :workflow_definition_name :wfname - alias :workflow_definition_revision :wfrevision + # + # Returns the workflow definition revision for this process. + # + def wfrevision - # - # Returns the count of concurrent branches currently active for - # this process. The typical 'sequential only' process will - # have a return value of 1 here. - # - def branches + @expressions.first.fei.wfrevision + end - @expressions.size - end + alias :workflow_definition_revision :wfrevision - # - # Returns the tags currently set in this process. - # - def tags + # + # Returns the count of concurrent branches currently active for + # this process. The typical 'sequential only' process will + # have a return value of 1 here. + # + def branches - return [] unless @variables + @expressions.size + end - @variables.keys.select do |k| - @variables[k].is_a?(OpenWFE::RawExpression::Tag) - end - end + # + # Returns the tags currently set in this process. + # + def tags - # - # Returns true if the process is in pause. - # - def paused? + return [] unless @variables - #@expressions.first.paused? - @paused - end + @variables.keys.select do |k| + @variables[k].is_a?(OpenWFE::RawExpression::Tag) + end + end - # - # this method is used by Engine.get_process_status() when - # it prepares its results. - # - def << (item) + # + # Returns true if the process is in pause. + # + def paused? - if item.kind_of?(FlowExpression) - add_expression item - else - add_error item - end - end + #@expressions.first.paused? + @paused + end - # - # A String representation, handy for debugging, quick viewing. - # - def to_s + # + # this method is used by Engine.get_process_status() when + # it prepares its results. + # + def << (item) - s = [] + if item.is_a?(FlowExpression) - s << "-- #{self.class.name} --" - s << " wfid : #{@wfid}" - s << " launch_time : #{launch_time}" - s << " tags : #{tags.join(", ")}" - s << " errors : #{@errors.size}" - s << " paused : #{paused?}" + @wfid ||= item.fei.parent_wfid - s << " expressions :" - @expressions.each do |fexp| - s << " #{fexp.fei.to_s}" - end + @variables = item.variables \ + if item.is_a?(Environment) and item.fei.expid == "0" - s.join "\n" - end + @launch_time ||= item.apply_time \ + if item.fei.expid == '0' and item.fei.is_in_parent_process? - protected + @all_expressions << item - def add_expression (fexp) + else - if fexp.is_a?(Environment) - @variables = fexp.variables if fexp.fei.expid == "0" - return - end + @errors[item.fei] = item + end + end - @wfid ||= fexp.fei.parent_wfid + protected - @launch_time = fexp.apply_time if fexp.fei.expid == '0' + # + # Prepares the @expressions instance variable. This method + # is only called by the process_status method of the Engine. + # + def pack_expressions - exps = @expressions - @expressions = [] + @expressions = [] - added = false - @expressions = exps.collect do |fe| - if added or fe.fei.wfid != fexp.fei.wfid - fe - else - if OpenWFE::starts_with(fexp.fei.expid, fe.fei.expid) - added = true - fexp - elsif OpenWFE::starts_with(fe.fei.expid, fexp.fei.expid) - added = true - fe - else - fe - end - end - end - @expressions << fexp unless added - end + @all_expressions.sort_by { |fe| fe.fei.expid }.each do |fe| - def add_error (error) + next unless fe.apply_time + # no Environment or RawExpression instances - @errors[error.fei] = error - end - end + @expressions.delete_if { |e| e.fei == fe.parent_id } + @expressions << fe + end + end + end + + # + # just a nice to_s for the ProcessStatuses hash + # + module StatusesMixin + + attr_accessor :timestamp + # # Renders a nice, terminal oriented, representation of an # Engine.get_process_status() result. # # You usually directly benefit from this when doing # - # puts engine.get_process_status.to_s + # puts engine.get_process_status.to_s # - def OpenWFE.pretty_print_process_status (ps) + def to_s - # TODO : include launch_time and why is process_id so long ? + # TODO : include launch_time and why is process_id so long ? - s = "" - s << "process_id | name | rev | brn | err | paused? \n" - s << "--------------------+-------------------+---------+-----+-----+---------\n" + s = "" + s << "process_id | name | rev | brn | err | paused? \n" + s << "--------------------+-------------------+---------+-----+-----+---------\n" - ps.keys.sort.each do |wfid| + self.keys.sort.each do |wfid| - status = ps[wfid] - fexp = status.expressions.first - ffei = fexp.fei + status = self[wfid] + fexp = status.expressions.first + ffei = fexp.fei - s << "%-19s" % wfid[0, 19] - s << " | " - s << "%-17s" % ffei.workflow_definition_name[0, 17] - s << " | " - s << "%-7s" % ffei.workflow_definition_revision[0, 7] - s << " | " - s << "%3s" % status.expressions.size.to_s[0, 3] - s << " | " - s << "%3s" % status.errors.size.to_s[0, 3] - s << " | " - s << "%5s" % status.paused?.to_s - s << "\n" - end - s + s << "%-19s" % wfid[0, 19] + s << " | " + s << "%-17s" % ffei.workflow_definition_name[0, 17] + s << " | " + s << "%-7s" % ffei.workflow_definition_revision[0, 7] + s << " | " + s << "%3s" % status.expressions.size.to_s[0, 3] + s << " | " + s << "%3s" % status.errors.size.to_s[0, 3] + s << " | " + s << "%5s" % status.paused?.to_s + s << "\n" + end + + s end + end + # + # This mixin is only included by the Engine class. It contains all + # the methods about ProcessStatus. + # + # Note : it caches process status to avoid too big a load on the + # expression storage, the weeping mecha stays here. + # + module StatusMethods + + def init_status_cache + + @status_cache = LruHash.new(30) + @all_status_cache = nil + + get_expression_pool.add_observer(:all) do |event, *args| + fei = args.find { |a| a.is_a?(FlowExpressionId) } + @status_cache.delete(fei.wfid) if fei + @all_status_cache = nil if fei or event == :launch + end + end + # - # This mixin is only included by the Engine class. It contains all - # the methods about ProcessStatus. + # Returns a hash of ProcessStatus instances. The keys of the hash + # are workflow instance ids. # - module StatusMethods + # A ProcessStatus is a description of the state of a process instance. + # It enumerates the expressions where the process is currently + # located (waiting certainly) and the errors the process currently + # has (hopefully none). + # + # the :wfid_prefix option is useful when you want to list all the process + # for a year (:wfid_prefix => '2007'), a month (:wfid_prefix => '200705') or + # a day (:wfid_prefix => '20070529'). + # + def process_statuses (options={}) - # - # Returns a hash of ProcessStatus instances. The keys of the hash - # are workflow instance ids. - # - # A ProcessStatus is a description of the state of a process instance. - # It enumerates the expressions where the process is currently - # located (waiting certainly) and the errors the process currently - # has (hopefully none). - # - def process_statuses (options={}) + return @all_status_cache if options == {} and @all_status_cache - options = { :wfid_prefix => options } if options.is_a?(String) + init_status_cache unless @status_cache - result = {} + options = { :wfid_prefix => options } if options.is_a?(String) - expressions = get_expression_storage.find_expressions options + expressions = get_expression_storage.find_expressions(options) - expressions.each do |fexp| + result = expressions.inject({}) do |r, fe| - next unless (fexp.apply_time or fexp.is_a?(Environment)) + #next unless (fe.apply_time or fe.is_a?(Environment)) + #next if fe.fei.wfid == '0' # skip the engine env - next if fexp.fei.wfid == "0" # skip the engine env + (r[fe.fei.parent_wfid] ||= ProcessStatus.new) << fe + r + end - #(result[fexp.fei.parent_wfid] ||= ProcessStatus.new) << fexp + result.values.each do |ps| - parent_wfid = fexp.fei.parent_wfid + ps.paused = (get_expool.paused_instances[ps.wfid] != nil) - ps = result[parent_wfid] + get_error_journal.get_error_log(ps.wfid).each { |er| ps << er } - if not ps + ps.send :pack_expressions # letting it protected - ps = ProcessStatus.new + ps.scheduled_jobs = get_scheduler.find_jobs(ps.wfid) - ps.paused = - (get_expool.paused_instances[parent_wfid] != nil) + if ps.expressions.size == 0 + # drop result if there are no expressions + result.delete(ps.wfid) + @status_cache.delete(ps.wfid) + else + @status_cache[ps.wfid] = ps + end + end - result[parent_wfid] = ps - end + # + # done - ps << fexp - end + result.extend(StatusesMixin) + result.timestamp = Time.now - result.values.each do |ps| - get_error_journal.get_error_log(ps.wfid).each do |error| - ps << error - end - end + @all_status_cache = result + # cache and return + end - class << result - def to_s - OpenWFE::pretty_print_process_status(self) - end - end + # + # list_process_status() will be deprecated at release 1.0.0 + # + alias :list_process_status :process_statuses - result - end + # + # Returns the process status of one given process instance. + # + def process_status (wfid) - # - # list_process_status() will be deprecated at release 1.0.0 - # - alias :list_process_status :process_statuses + init_status_cache unless @status_cache - # - # Returns the process status of one given process instance. - # - def process_status (wfid) + (r = @status_cache[wfid]) and return r - wfid = extract_wfid wfid, true + wfid = extract_wfid(wfid, true) - process_statuses(:wfid => wfid).values[0] - end + process_statuses(:wfid_prefix => wfid).values.first + end - # - # Returns true if the process is in pause. - # - def is_paused? (wfid) + # + # Returns true if the process is in pause. + # + def is_paused? (wfid) - (get_expression_pool.paused_instances[wfid] != nil) - end + (get_expression_pool.paused_instances[wfid] != nil) end + end end